15、Skywalking的存储链路初始化
扫描@Stream
sw在启动时候CoreModuleProvider#start
中的annotationScan.scan()
触发调用StreamAnnotationListener#notify
这里会处理带有@Stream
注解的类,比如:
@Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, builder = SegmentRecord.Builder.class, processor = RecordStreamProcessor.class)
public class SegmentRecord extends Record
这个@Stream
的作用非常强大:尤其是processor
是核心,这里列举几个processor
的主要功能:
- 创建Model;orm中需要一个对象来保存跟存储相关的信息,在skywalking中就是Model
- 构建了接收到数据之后,进一步处理数据,存储数据相关的对象(DAO,Model)以及工作链(一组
StreamProcessor
)。
剩余的几个参数将在processor
工作时被用到,这里先简单介绍一下:
name = SegmentRecord.INDEX_NAME //指定这类数据所存储的es索引的名称
scopeId = DefaultScopeDefine.SEGMENT // 数据类型标识
builder = SegmentRecord.Builder.class //将数据转和Map之间的互换;比如存储时:SegmentRecord 转换成map,再将map构造成IndexRequest的内容、
创建Model
StreamProcessor#create
方法中会根据@Stream
注解中的信息,以及被其注解的类的一些信息,来创建Model,比如:
- 根据@Stream中的name ,确定索引、模板的名称
- 扫描
StorageData
中的 @Column 注解,确定 ES 索引中的字段名称`
Model对象被创建之后会被添加到StorageModels中,StorageModels作为管理的角色,基于Model提供了更多的功能,其中很重要的一个是 可以在Server启动的时候,根据配置,通过Model中所承载的名称,字段描述等信息来创建es索引/模板;起到初始化存储环境的作用;其基本实现逻辑是:
ModelCreator通过监听器的模式给使用者提供扩展实现,实现类ModelInstaller
的监听方法whenCreating
中提供了createTable(model)
这个抽象方法,子类各自定义具体的实现;比如StorageEsInstaller
中会做es相关索引、模板的初始化。
Model承载的信息:
name(String类型):Metric 名称,在 OAP 创建 ES 索引时会使用
columns(List类型):ES 索引 中的 Field 集合,一个 ModelColumn 对象记录了一个 Field 的名称、类型等信息。
capableOfTimeSeries(boolean类型):对应 ES 索引中存储的数据是否为时间相关的数据。
downsampling(Downsampling类型):如果是时间相关的数据,则需要指定其 Downsampling 单位,可选值有 Second、 Minute、 Hour、 Day、 Month。对于非时间相关的数据,则该字段值为 Downsampling.NONE。
deleteHistory(boolean类型):是否删除历史数据。
scopeId(int类型):对应指标的全局唯一 ID。
创建DAO
创建StorageBuilder
,让后用StorageBuilder
创建DAO
;
StorageBuilder
用于构建真实的存储需要使用的数据,比如es存储的情况下:
将StorageData
对象转换成Map,再将Map构建ES的XContentBuilder
,进而创建IndexRequest
。比如RecordEsDAO#prepareBatchInsert
:
@Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
//1.storageBuilder.entity2Storage(record) 将Record转换成Map
XContentBuilder builder = map2builder(
IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(record)));
String modelName = TimeSeriesUtils.writeIndexName(model, record.getTimeBucket());
String id = IndexController.INSTANCE.generateDocId(model, record.id());
return getClient().prepareInsert(modelName, id, builder);
}
// 2.map 转成 es 的XContentBuilder
protected XContentBuilder map2builder(Map<String, Object> objectMap) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
for (Map.Entry<String, Object> entries: objectMap.entrySet()) {
Object value = entries.getValue();
String key = entries.getKey();
if (value instanceof StorageDataComplexObject) {
builder.field(key, ((StorageDataComplexObject) value).toStorageData());
} else {
builder.field(key, value);
}
}
builder.endObject();
return builder;
}
创建StreamProcessor,构造数据处理Worker链
StreamProcessor
构建了接收到数据之后,进一步处理数据,存储数据相关的对象以及链路。比如:创建Model对象,创建Dao对象,创建一组AbstractWorker
对象,这些AbstractWorker
构成任务链,顺序执行,各自完成一部分功能。 比如RecordPersistentWorker
就负责在接收数据环节调用dao进行存储。
StreamProcessor有以下实现:
ManagementStreamProcessor
MetricsStreamProcessor //Metric
NoneStreamProcessor
RecordStreamProcessor // TraceSegment
TopNStreamProcessor //慢xx