thingsboard

thingsboard源码解析(1)--Actor系统流程

2022-03-17  本文已影响0人  永和包仔

1. application 的目录架构

└── java
    └── org/thingsboard/server
        ├── ThingsboardInstallApplication.java (程序初始化入口-安装脚本等)
        ├── ThingsboardServerApplication.java (程序服务启动入口)
        ├── actors (Actor模型消息处理核心逻辑)
        ├── config (配置类)
        ├── controller (服务接口层)
        ├── exception (异常)
        ├── install (安装相关)
        ├── service (服务层)
        └── utils (工具类)
└── resources
    ├── banner.txt (应用启动时的控制台输出logo)
    ├── i18n (国际化相关)
    ├── logback.xml (日志框架的配置文件)
    ├── templates (模版)
    └── thingsboard.yml (配置文件)    

从java的入口中有两个:

org/thingsboard/server/ThingsboardInstallApplication.java
org/thingsboard/server/ThingsboardServerApplication.java

其中ThingsboardInstallApplication这个启动项对应的是tb的安装服务,主要是初始化数据库相关的信息;而ThingsboardServerApplication则对应core和rule-engine服务。IDE导入源码后,通过不同的启动类的方式来启动指定具体要启动的服务。

2. 项目初始化

由于项目是springboot 框架,在实际项目中,服务启动后做一些初始化工作,例如线程池初始化、文件资源加载、常驻后台任务启动(比如kafka consumer)等。一般初始化资源的方法有3类

所以先搜索
@EventListener看看如下图

image.png

@PostConstruct看看如下图

image.png

看到log.info("Initializing actor system.");的注释,初步怀疑从这里开始。所以进入到这个函数再细看下:

    public void initActorSystem() {
        log.info("Initializing actor system.");
        actorContext.setActorService(this);  //初始化上下文
        TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
        system = new DefaultTbActorSystem(settings);//初始化系统,主要初始化任务执行扔线程池
        
        //初始化派发
        system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
        system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
        system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
        system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));

        actorContext.setActorSystem(system);

        appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));
        actorContext.setAppActor(appActor);

        TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));
        actorContext.setStatsActor(statsActor);

        log.info("Actor system initialized.");
    }

DefaultActorService 完成了acotr的初始化:

    @Override
    protected boolean doProcess(TbActorMsg msg) {
        if (!ruleChainsInitialized) {
            "initTenantActors();"//初始化租户Actor
            ruleChainsInitialized = true;
            if (msg.getMsgType() != MsgType.APP_INIT_MSG && msg.getMsgType() != MsgType.PARTITION_CHANGE_MSG) {
                log.warn("Rule Chains initialized by unexpected message: {}", msg);
            }
        }
......
    private void initTenantActors() {
        log.info("Starting main system actor.");
        try {
            // This Service may be started for specific tenant only.
            Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
            if (isolatedTenantId.isPresent()) {
                Tenant tenant = systemContext.getTenantService().findTenantById(isolatedTenantId.get());
                if (tenant != null) {
                    log.debug("[{}] Creating tenant actor", tenant.getId());
                    getOrCreateTenantActor(tenant.getId());
                    log.debug("Tenant actor created.");
                } else {
                    log.error("[{}] Tenant with such ID does not exist", isolatedTenantId.get());
                }
            } else if (systemContext.isTenantComponentsInitEnabled()) {
                PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
                boolean isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
                boolean isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
                for (Tenant tenant : tenantIterator) {
                    TenantProfile tenantProfile = tenantProfileCache.get(tenant.getTenantProfileId());
                    if (isCore || (isRuleEngine && !tenantProfile.isIsolatedTbRuleEngine())) {
                        log.debug("[{}] Creating tenant actor", tenant.getId());
                        getOrCreateTenantActor(tenant.getId());
                        log.debug("[{}] Tenant actor created.", tenant.getId());
                    }
                }
            }
            log.info("Main system actor started.");
        } catch (Exception e) {
            log.warn("Unknown failure", e);
        }
    }

    private TbActorRef getOrCreateTenantActor(TenantId tenantId) {
        return ctx.getOrCreateChildActor(new TbEntityActorId(tenantId),
                () -> DefaultActorService.TENANT_DISPATCHER_NAME,
                () -> new TenantActor.ActorCreator(systemContext, tenantId));
    }

image.png
上一篇下一篇

猜你喜欢

热点阅读