15、Skywalking的存储链路初始化

2021-06-09  本文已影响0人  rock_fish

扫描@Stream

sw在启动时候CoreModuleProvider#start中的annotationScan.scan()触发调用StreamAnnotationListener#notify这里会处理带有@Stream注解的类,比如:

@Stream(name = SegmentRecord.INDEX_NAME, scopeId = DefaultScopeDefine.SEGMENT, builder = SegmentRecord.Builder.class, processor = RecordStreamProcessor.class)
public class SegmentRecord extends Record

这个@Stream的作用非常强大:尤其是processor是核心,这里列举几个processor的主要功能:

剩余的几个参数将在processor工作时被用到,这里先简单介绍一下:

name = SegmentRecord.INDEX_NAME  //指定这类数据所存储的es索引的名称
scopeId = DefaultScopeDefine.SEGMENT // 数据类型标识
builder = SegmentRecord.Builder.class //将数据转和Map之间的互换;比如存储时:SegmentRecord 转换成map,再将map构造成IndexRequest的内容、
创建Model

StreamProcessor#create方法中会根据@Stream注解中的信息,以及被其注解的类的一些信息,来创建Model,比如:

Model对象被创建之后会被添加到StorageModels中,StorageModels作为管理的角色,基于Model提供了更多的功能,其中很重要的一个是 可以在Server启动的时候,根据配置,通过Model中所承载的名称,字段描述等信息来创建es索引/模板;起到初始化存储环境的作用;其基本实现逻辑是:

ModelCreator通过监听器的模式给使用者提供扩展实现,实现类ModelInstaller的监听方法whenCreating中提供了createTable(model)这个抽象方法,子类各自定义具体的实现;比如StorageEsInstaller中会做es相关索引、模板的初始化。

Model承载的信息:

name(String类型):Metric 名称,在 OAP 创建 ES 索引时会使用
columns(List类型):ES 索引 中的 Field 集合,一个 ModelColumn 对象记录了一个 Field 的名称、类型等信息。
capableOfTimeSeries(boolean类型):对应 ES 索引中存储的数据是否为时间相关的数据。
downsampling(Downsampling类型):如果是时间相关的数据,则需要指定其 Downsampling 单位,可选值有 Second、 Minute、 Hour、 Day、 Month。对于非时间相关的数据,则该字段值为 Downsampling.NONE。
deleteHistory(boolean类型):是否删除历史数据。
scopeId(int类型):对应指标的全局唯一 ID。
创建DAO

创建StorageBuilder,让后用StorageBuilder创建DAO
StorageBuilder用于构建真实的存储需要使用的数据,比如es存储的情况下:
StorageData对象转换成Map,再将Map构建ES的XContentBuilder,进而创建IndexRequest。比如RecordEsDAO#prepareBatchInsert:

    @Override
    public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
        //1.storageBuilder.entity2Storage(record) 将Record转换成Map
        XContentBuilder builder = map2builder(
            IndexController.INSTANCE.appendMetricTableColumn(model, storageBuilder.entity2Storage(record)));
        String modelName = TimeSeriesUtils.writeIndexName(model, record.getTimeBucket());
        String id = IndexController.INSTANCE.generateDocId(model, record.id());
        return getClient().prepareInsert(modelName, id, builder);
    }

    // 2.map 转成 es 的XContentBuilder
    protected XContentBuilder map2builder(Map<String, Object> objectMap) throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
        for (Map.Entry<String, Object> entries: objectMap.entrySet()) {
            Object value = entries.getValue();
            String key = entries.getKey();
            if (value instanceof StorageDataComplexObject) {
                builder.field(key, ((StorageDataComplexObject) value).toStorageData());
            } else {
                builder.field(key, value);
            }
        }
        builder.endObject();

        return builder;
    }
创建StreamProcessor,构造数据处理Worker链

StreamProcessor构建了接收到数据之后,进一步处理数据,存储数据相关的对象以及链路。比如:创建Model对象,创建Dao对象,创建一组AbstractWorker对象,这些AbstractWorker构成任务链,顺序执行,各自完成一部分功能。 比如RecordPersistentWorker就负责在接收数据环节调用dao进行存储。

StreamProcessor有以下实现:

ManagementStreamProcessor
MetricsStreamProcessor  //Metric
NoneStreamProcessor
RecordStreamProcessor // TraceSegment
TopNStreamProcessor //慢xx
上一篇 下一篇

猜你喜欢

热点阅读