分布式技术

SkyWalking源码分析-OAP Server

2019-09-27  本文已影响0人  DoubleFooker

基本概念

ApplicationConfiguration 应用配置信息

ModuleManager 模块管理器

ModuleDefine 模块信息

ModuleProvider 提供者

Service

抽象设计的接口类

启动的过程

OAPServerStartUp代码上了解sw的代码风格和设计

public class OAPServerStartUp {
    private static final Logger logger = LoggerFactory.getLogger(OAPServerStartUp.class);
    public static void main(String[] args) {
        String mode = System.getProperty("mode");
        RunningMode.setMode(mode);
        ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
        ModuleManager manager = new ModuleManager();
        try {
            // 配置信息的解析,读取application.yml的配置信息
            ApplicationConfiguration applicationConfiguration = configLoader.load();
            
            // 关键步骤 容器初始化,各模块的启动
            manager.init(applicationConfiguration);
            
            // 添加启动时间的监控信息
            manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class).createGauge("uptime",
                    "oap server start up time", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE)
                    // Set uptime to second
                    .setValue(System.currentTimeMillis() / 1000d);
            if (RunningMode.isInitMode()) {
                logger.info("OAP starts up in init mode successfully, exit now...");
                System.exit(0);
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
            System.exit(1);
        }
    }
}

main方法的代码比较简单就是加载配置文件,调用manager的init初始化
配置信息以coreModule为例说明

# ModuleConfiguration
core: # ModuleName
#PrividerConfiguration
  default: #ProviderName
    # Provider的Properties配置
    role: ${SW_CORE_ROLE:Mixed} 
    restHost: ${SW_CORE_REST_HOST:0.0.0.0}
    restPort: ${SW_CORE_REST_PORT:12800}
    restContextPath: ${SW_CORE_REST_CONTEXT_PATH:/}
    gRPCHost: ${SW_CORE_GRPC_HOST:0.0.0.0}
    gRPCPort: ${SW_CORE_GRPC_PORT:11800}
    downsampling:
      - Hour
      - Day
      - Month
    enableDataKeeperExecutor: ${SW_CORE_ENABLE_DATA_KEEPER_EXECUTOR:true} 
    dataKeeperExecutePeriod: ${SW_CORE_DATA_KEEPER_EXECUTE_PERIOD:5} 
    recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:90} 
    minuteMetricsDataTTL: ${SW_CORE_MINUTE_METRIC_DATA_TTL:90} 
    hourMetricsDataTTL: ${SW_CORE_HOUR_METRIC_DATA_TTL:36} 
    dayMetricsDataTTL: ${SW_CORE_DAY_METRIC_DATA_TTL:45} 
    monthMetricsDataTTL: ${SW_CORE_MONTH_METRIC_DATA_TTL:18} 
    enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}

继续跟踪manager.init方法

    public void init(
        ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException, ModuleStartException {
        String[] moduleNames = applicationConfiguration.moduleList();
        // 1. SPI机制查找所有模块实现类 ModuleDefine
        ServiceLoader<ModuleDefine> moduleServiceLoader = ServiceLoader.load(ModuleDefine.class);
        // SPI机制查实模块提供者实现类 ModuleProvider
        ServiceLoader<ModuleProvider> moduleProviderLoader = ServiceLoader.load(ModuleProvider.class);
        // 配置文件定义的模块
        LinkedList<String> moduleList = new LinkedList<>(Arrays.asList(moduleNames));
        // 检查配置文件中定义的模块,系统是是否存在
        for (ModuleDefine module : moduleServiceLoader) {
            for (String moduleName : moduleNames) {
                if (moduleName.equals(module.name())) {
                    ModuleDefine newInstance;
                    try {
                        // 反射创建实例
                        newInstance = module.getClass().newInstance();
                    } catch (InstantiationException | IllegalAccessException e) {
                        throw new ModuleNotFoundException(e);
                    }
                    // 2. 模块准备流程
                    newInstance.prepare(this, applicationConfiguration.getModuleConfiguration(moduleName), moduleProviderLoader);
                    loadedModules.put(moduleName, newInstance);
                    moduleList.remove(moduleName);
                }
            }
        }
        // Finish prepare stage
        isInPrepareStage = false;
        // 存在不知名module配置则抛异常
        if (moduleList.size() > 0) {
            throw new ModuleNotFoundException(moduleList.toString() + " missing.");
        }
        // 3. 模块加载完成 启动服务
        BootstrapFlow bootstrapFlow = new BootstrapFlow(loadedModules);
        bootstrapFlow.start(this);
        bootstrapFlow.notifyAfterCompleted();
    }

通过分析上面标注了信息的步骤来了解代码流程

1. SPI机制加载模块信息

通过JavaSPI机制加载META-INF/services目录下定义的类和实例
以CoreModule为例可以看到META-INF/services目录下定义的
org.apache.skywalking.oap.server.library.module.ModuleDefine
org.apache.skywalking.oap.server.library.module.ModuleProvider
文件
ModuleDefine

org.apache.skywalking.oap.server.core.storage.StorageModule
org.apache.skywalking.oap.server.core.cluster.ClusterModule
org.apache.skywalking.oap.server.core.CoreModule
org.apache.skywalking.oap.server.core.query.QueryModule
org.apache.skywalking.oap.server.core.alarm.AlarmModule
org.apache.skywalking.oap.server.core.exporter.ExporterModule

ModuleProvider

org.apache.skywalking.oap.server.core.CoreModuleProvider

加载完成后进行遍历,通过反射创建对象实例

2. ModuleDefine的prepare方法

    void prepare(ModuleManager moduleManager, ApplicationConfiguration.ModuleConfiguration configuration,
        ServiceLoader<ModuleProvider> moduleProviderLoader) throws ProviderNotFoundException, ServiceNotProvidedException, ModuleConfigException, ModuleStartException {
        // 遍历provider
        for (ModuleProvider provider : moduleProviderLoader) {
            // 配置中包含provider才处理
            if (!configuration.has(provider.name())) {
                continue;
            }
            // 判断Provider定义的Module是否 当前的ModuleDefine相同。建立一对一的关系
            // 这里避免多个provider关联同一个Module实例
            if (provider.module().equals(getClass())) {
                if (loadedProvider == null) {
                    loadedProvider = provider;
                    loadedProvider.setManager(moduleManager);
                    loadedProvider.setModuleDefine(this);
                } else {
                    // ModuleName + " module has one " + ProviderName + "[" + ProviderClass + "] provider already, "
                    throw new DuplicateProviderException(this.name() + " module has one " + loadedProvider.name() + "[" + loadedProvider.getClass().getName() + "] provider already, "
                        + provider.name() + "[" + provider.getClass().getName() + "] is defined as 2nd provider.");
                }
            }

        }

        if (loadedProvider == null) {
            throw new ProviderNotFoundException(this.name() + " module no provider exists.");
        }

        logger.info("Prepare the {} provider in {} module.", loadedProvider.name(), this.name());
        try {
            // 加载ModuleConfig配置,由provider管理
            copyProperties(loadedProvider.createConfigBeanIfAbsent(), configuration.getProviderConfiguration(loadedProvider.name()), this.name(), loadedProvider.name());
        } catch (IllegalAccessException e) {
            throw new ModuleConfigException(this.name() + " module config transport to config bean failure.", e);
        }
        // provider准备阶段
        loadedProvider.prepare();
    }

ModuleDefine的准备阶段主要工作是关联一个Provider,有且必须只能关联到一个provider,否则报错。建立完关联关系后加载配置信息到Provider,再执行Provider的准备阶段。
继续跟踪Provider的prepare。ModuleProvider的prepare方法是抽象方法,由具体子类实现。以CoreModuleProvider为例

 @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
    // 暂时不展开,省略代码
        this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig));
        this.registerServiceImplementation(DownsamplingConfigService.class, new DownsamplingConfigService(moduleConfig.getDownsampling()));
        this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
        this.registerServiceImplementation(JettyHandlerRegister.class, new JettyHandlerRegisterImpl(jettyServer));
        this.registerServiceImplementation(IComponentLibraryCatalogService.class, new ComponentLibraryCatalogService());
        this.registerServiceImplementation(SourceReceiver.class, receiver);
        WorkerInstancesService instancesService = new WorkerInstancesService();
        this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService);
        this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);
        this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
        this.registerServiceImplementation(IModelSetter.class, storageModels);
        this.registerServiceImplementation(IModelGetter.class, storageModels);
        this.registerServiceImplementation(IModelOverride.class, storageModels);
        this.registerServiceImplementation(ServiceInventoryCache.class, new ServiceInventoryCache(getManager()));
        this.registerServiceImplementation(IServiceInventoryRegister.class, new ServiceInventoryRegister(getManager()));
        this.registerServiceImplementation(ServiceInstanceInventoryCache.class, new ServiceInstanceInventoryCache(getManager()));
        this.registerServiceImplementation(IServiceInstanceInventoryRegister.class, new ServiceInstanceInventoryRegister(getManager()));
        this.registerServiceImplementation(EndpointInventoryCache.class, new EndpointInventoryCache(getManager()));
        this.registerServiceImplementation(IEndpointInventoryRegister.class, new EndpointInventoryRegister(getManager()));
        this.registerServiceImplementation(NetworkAddressInventoryCache.class, new NetworkAddressInventoryCache(getManager()));
        this.registerServiceImplementation(INetworkAddressInventoryRegister.class, new NetworkAddressInventoryRegister(getManager()));
        this.registerServiceImplementation(TopologyQueryService.class, new TopologyQueryService(getManager()));
        this.registerServiceImplementation(MetricQueryService.class, new MetricQueryService(getManager()));
        this.registerServiceImplementation(TraceQueryService.class, new TraceQueryService(getManager()));
        this.registerServiceImplementation(LogQueryService.class, new LogQueryService(getManager()));
        this.registerServiceImplementation(MetadataQueryService.class, new MetadataQueryService(getManager()));
        this.registerServiceImplementation(AggregationQueryService.class, new AggregationQueryService(getManager()));
        this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager()));
        this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager()));
        this.registerServiceImplementation(CommandService.class, new CommandService(getManager()));
        this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
    }

先不展开讲CoreModuleProvider的具体功能,对比其他Provider,prepare都执行了 this.registerServiceImplementation()方法用以注册Provider需要的Service实例。
当所有ModuleDefine执行完prepare,准备阶段结束。

3. BootstrapFlow的运行

BootstrapFlow的创建,关键看makeSequence方法

    BootstrapFlow(Map<String, ModuleDefine> loadedModules) throws CycleDependencyException {
        this.loadedModules = loadedModules;
        startupSequence = new LinkedList<>();
        makeSequence();
    }

makeSequence方法,实现的功能是通过自旋判断Provider需要的Modules是否已经准备完成,当出现循环依赖是抛出异常

    private void makeSequence() throws CycleDependencyException {
        List<ModuleProvider> allProviders = new ArrayList<>();
        // 创建待启动提供者集合
        loadedModules.forEach((moduleName, module) -> allProviders.add(module.provider()));
        // 检测提供者模块依赖模块启动
        do {
            int numOfToBeSequenced = allProviders.size();
            for (int i = 0; i < allProviders.size(); i++) {
                ModuleProvider provider = allProviders.get(i);
                // 模块提供者配置了依赖模块
                String[] requiredModules = provider.requiredModules();
                if (CollectionUtils.isNotEmpty(requiredModules)) {
                    boolean isAllRequiredModuleStarted = true;
                    for (String module : requiredModules) {
                        // find module in all ready existed startupSequence
                        boolean exist = false;
                        for (ModuleProvider moduleProvider : startupSequence) {
                            if (moduleProvider.getModuleName().equals(module)) {
                                exist = true;
                                break;
                            }
                        }
                        // 依赖模块未启动 跳出循环 下次再检测
                        if (!exist) {
                            isAllRequiredModuleStarted = false;
                            break;
                        }
                    }
                    if (isAllRequiredModuleStarted) {
                        startupSequence.add(provider);
                        // 启动完成移除
                        allProviders.remove(i);
                        i--;
                    }
                } else {
                    // 无依赖 启动完成
                    startupSequence.add(provider);
                    allProviders.remove(i);
                    i--;
                }
            }
            // 循环依赖检测 MA依赖MB,MB依赖MA,则allProviders永远无法remove,出现循环
            if (numOfToBeSequenced == allProviders.size()) {
                StringBuilder unSequencedProviders = new StringBuilder();
                allProviders.forEach(provider -> unSequencedProviders.append(provider.getModuleName()).append("[provider=").append(provider.getClass().getName()).append("]\n"));
                throw new CycleDependencyException("Exist cycle module dependencies in \n" + unSequencedProviders.substring(0, unSequencedProviders.length() - 1));
            }
        }
        while (allProviders.size() != 0);
    }

再看BootstFlow.start(),这里负责的是Provider的启动阶段,并做了module和provider中service关系的校验

    void start(
        ModuleManager moduleManager) throws ModuleNotFoundException, ServiceNotProvidedException, ModuleStartException {
        for (ModuleProvider provider : startupSequence) {
            String[] requiredModules = provider.requiredModules();
            if (requiredModules != null) {
                for (String module : requiredModules) {
                    if (!moduleManager.has(module)) {
                        throw new ModuleNotFoundException(module + " is required by " + provider.getModuleName()
                            + "." + provider.name() + ", but not found.");
                    }
                }
            }
            logger.info("start the provider {} in {} module.", provider.name(), provider.getModuleName());
            // 判断关联的ModuleDefine定义的services class[] 实例是否已经注册进来
            provider.requiredCheck(provider.getModule().services());
            // 启动
            provider.start();
        }
    }

provider的start是抽象方法,由具体实现类完成。
BootstrapFlow执行完成start后,调用notifyAfterCompleted,遍历启动完成的provider执行notifyAfterCompleted方法,用于启动完成的回调。同样notifyAfterCompleted也是抽象方法,由具体实现类实现。
至此OAP Server启动流程完成。
可通过流程图辅助了解

image.png

文章基于6.4.0版本
有些术语凭自己理解定义可能不太严谨,欢迎指正。

上一篇 下一篇

猜你喜欢

热点阅读