框架原理elasticsearchES搜索系列

玩转Elasticsearch源码-一图看懂ES启动流程

2019-01-07  本文已影响27人  左手java右手go

开篇

直接看图


ES启动流程.png

上图中虚线表示进入具体流程,实线表示下一步,为了后面讲解方便每个步骤都加了编号。
先简单介绍下启动流程主要涉及的类:

流程讲解

  1. main方法
  2. 设置了一个空的SecurityManager:

// we want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the
// presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy)
//我们希望JVM认为已经安装了一个安全管理器,这样,如果基于安全管理器的存在或缺少安全管理器的内部策略决策就会像有一个安全管理器一样(e.g.、DNS缓存策略)
// grant all permissions so that we can later set the security manager to the one that we want
//授予所有权限,以便稍后可以将安全管理器设置为所需的权限

添加StatusConsoleListener到STATUS_LOGGER:

We want to detect situations where we touch logging before the configuration is loaded . If we do this , Log 4 j will status log an error message at the error level . With this error listener , we can capture if this happens . More broadly , we can detect any error - level status log message which likely indicates that something is broken . The listener is installed immediately on startup , and then when we get around to configuring logging we check that no error - level log messages have been logged by the status logger . If they have we fail startup and any such messages can be seen on the console
我们希望检测在加载配置之前进行日志记录的情况。如果这样做,log4j将在错误级别记录一条错误消息。使用这个错误监听器,我们可以捕捉到这种情况。更广泛地说,我们可以检测任何错误级别的状态日志消息,这些消息可能表示某个东西坏了。侦听器在启动时立即安装,然后在配置日志记录时,我们检查状态日志记录器没有记录错误级别的日志消息。如果它们启动失败,我们可以在控制台上看到任何此类消息。

实例化Elasticsearch:

Elasticsearch() {
        super("starts elasticsearch", () -> {}); // () -> {} 是启动前的回调
        //下面解析version,daemonize,pidfile,quiet参数
        versionOption = parser.acceptsAll(Arrays.asList("V", "version"),
            "Prints elasticsearch version information and exits");
        daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"),
            "Starts Elasticsearch in the background")
            .availableUnless(versionOption);
        pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"),
            "Creates a pid file in the specified path on start")
            .availableUnless(versionOption)
            .withRequiredArg()
            .withValuesConvertedBy(new PathConverter());
        quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"),
            "Turns off standard output/error streams logging in console")
            .availableUnless(versionOption)
            .availableUnless(daemonizeOption);
    }

3.注册ShutdownHook,用于关闭系统时捕获IOException到terminal

            shutdownHookThread = new Thread(() -> {
                try {
                    this.close();
                } catch (final IOException e) {
                    try (
                        StringWriter sw = new StringWriter();
                        PrintWriter pw = new PrintWriter(sw)) {
                        e.printStackTrace(pw);
                        terminal.println(sw.toString());
                    } catch (final IOException impossible) {
                        // StringWriter#close declares a checked IOException from the Closeable interface but the Javadocs for StringWriter
                        // say that an exception here is impossible
                        throw new AssertionError(impossible);
                    }
                }
            });
            Runtime.getRuntime().addShutdownHook(shutdownHookThread);

然后调用beforeMain.run(),其实就是上面实例化Elasticsearch对象时创建的()->{} lambda表达式。
4.进入Command类的mainWithoutErrorHandling方法

 void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {
        final OptionSet options = parser.parse(args);//根据提供给解析器的选项规范解析给定的命令行参数

        if (options.has(helpOption)) {
            printHelp(terminal);
            return;
        }

        if (options.has(silentOption)) {//terminal打印最少内容
            terminal.setVerbosity(Terminal.Verbosity.SILENT);
        } else if (options.has(verboseOption)) {//terminal打印详细内容
            terminal.setVerbosity(Terminal.Verbosity.VERBOSE);
        } else {
            terminal.setVerbosity(Terminal.Verbosity.NORMAL);
        }

        execute(terminal, options);
    }

5.进入EnvironmentAwareCommand的execute方法

protected void execute(Terminal terminal, OptionSet options) throws Exception {
        final Map<String, String> settings = new HashMap<>();
        for (final KeyValuePair kvp : settingOption.values(options)) {
            if (kvp.value.isEmpty()) {
                throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty");
            }
            if (settings.containsKey(kvp.key)) {
                final String message = String.format(
                        Locale.ROOT,
                        "setting [%s] already set, saw [%s] and [%s]",
                        kvp.key,
                        settings.get(kvp.key),
                        kvp.value);
                throw new UserException(ExitCodes.USAGE, message);
            }
            settings.put(kvp.key, kvp.value);
        }

        //确保给定的设置存在,如果尚未设置,则从系统属性中读取它。
        putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");
        putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");
        putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");

        execute(terminal, options, createEnv(terminal, settings));
    }

6.进入InternalSettingsPreparer的prepareEnvironment方法,读取elasticsearch.yml并创建Environment。细节比较多,后面再细讲。


Environment对象.png

7.判断是否有-v参数,没有则准备进入init流程

 protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
        if (options.nonOptionArguments().isEmpty() == false) {
            throw new UserException(ExitCodes.USAGE, "Positional arguments not allowed, found " + options.nonOptionArguments());
        }
        if (options.has(versionOption)) { //如果有 -v 参数,打印版本号后直接退出
            terminal.println("Version: " + Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot())
                    + ", Build: " + Build.CURRENT.shortHash() + "/" + Build.CURRENT.date()
                    + ", JVM: " + JvmInfo.jvmInfo().version());
            return;
        }

        final boolean daemonize = options.has(daemonizeOption);
        final Path pidFile = pidfileOption.value(options);
        final boolean quiet = options.has(quietOption);

        try {
            init(daemonize, pidFile, quiet, env);
        } catch (NodeValidationException e) {
            throw new UserException(ExitCodes.CONFIG, e.getMessage());
        }
    }

8.调用Bootstrap.init
9.实例化Boostrap。保持keepAliveThread存活,可能是用于监控

Bootstrap() {
        keepAliveThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    keepAliveLatch.await();
                } catch (InterruptedException e) {
                    // bail out
                }
            }
        }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
        keepAliveThread.setDaemon(false);
        // keep this thread alive (non daemon thread) until we shutdown 保持这个线程存活(非守护进程线程),直到我们关机
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                keepAliveLatch.countDown();
            }
        });
    }

10.加载elasticsearch.keystore文件,重新创建Environment,然后调用LogConfigurator的静态方法configure,读取config目录下log4j2.properties然后配log4j属性
11.创建pid文件,检查lucene版本,不对应则抛出异常

 private static void checkLucene() {
        if (Version.CURRENT.luceneVersion.equals(org.apache.lucene.util.Version.LATEST) == false) {
            throw new AssertionError("Lucene version mismatch this version of Elasticsearch requires lucene version ["
                + Version.CURRENT.luceneVersion + "]  but the current lucene version is [" + org.apache.lucene.util.Version.LATEST + "]");
        }
    }

12.设置ElasticsearchUncaughtExceptionHandler用于打印fatal日志

            // install the default uncaught exception handler; must be done before security is
            // initialized as we do not want to grant the runtime permission
            // 安装默认未捕获异常处理程序;必须在初始化security之前完成,因为我们不想授予运行时权限
            // setDefaultUncaughtExceptionHandler
            Thread.setDefaultUncaughtExceptionHandler(
                new ElasticsearchUncaughtExceptionHandler(() -> Node.NODE_NAME_SETTING.get(environment.settings())));

13.进入Boostrap.setup
14.spawner.spawnNativePluginControllers(environment);尝试为给定模块生成控制器(native Controller)守护程序。 生成的进程将通过其stdin,stdout和stderr流保持与此JVM的连接,但对此包之外的代码不能使用对这些流的引用。
15.初始化本地资源 initializeNatives():检查用户是否作为根用户运行,是的话抛异常;系统调用和mlockAll检查;尝试设置最大线程数,最大虚拟内存,最大FD等。
初始化探针initializeProbes(),用于操作系统,进程,jvm的监控。
16.又加一个ShutdownHook

        if (addShutdownHook) {
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    try {
                        IOUtils.close(node, spawner);
                        LoggerContext context = (LoggerContext) LogManager.getContext(false);
                        Configurator.shutdown(context);
                    } catch (IOException ex) {
                        throw new ElasticsearchException("failed to stop node", ex);
                    }
                }
            });
        }

17.比较简单,直接看代码

 try {
            // look for jar hell
            JarHell.checkJarHell();
        } catch (IOException | URISyntaxException e) {
            throw new BootstrapException(e);
        }

        // Log ifconfig output before SecurityManager is installed
        IfConfig.logIfNecessary();

        // install SM after natives, shutdown hooks, etc.
        try {
            Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
        } catch (IOException | NoSuchAlgorithmException e) {
            throw new BootstrapException(e);
        }

18.实例化Node,重写validateNodeBeforeAcceptingRequests方法
19.进入Boostrap.start
20.node.start启动节点
21.keepAliveThread.start
22.Node实例化第一步,创建NodeEnvironment


NodeEnvironment.png

23.生成nodeId,打印nodeId,nodeName和jvmInfo和进程信息
24.创建 PluginsService 对象,创建过程中会读取并加载所有的模块和插件

25.又创建Environment

            // create the environment based on the finalized (processed) view of the settings 根据设置的最终(处理)视图创建环境
            // this is just to makes sure that people get the same settings, no matter where they ask them from 这只是为了确保人们得到相同的设置,无论他们从哪里询问
            this.environment = new Environment(this.settings, environment.configFile());

26.创建ThreadPool,然后给DeprecationLogger设置ThreadContext
27.创建NodeClient,用于执行actions
28.创建各个Service:ResourceWatcherService、NetworkService、ClusterService、IngestService、ClusterInfoService、UsageService、MonitorService、CircuitBreakerService、MetaStateService、IndicesService、MetaDataIndexUpgradeService、TemplateUpgradeService、TransportService、ResponseCollectorService、SearchTransportService、NodeService、SearchService、PersistentTasksClusterService
29.创建并添加modules:ScriptModule、AnalysisModule、SettingsModule、pluginModule、ClusterModule、IndicesModule、SearchModule、GatewayModule、RepositoriesModule、ActionModule、NetworkModule、DiscoveryModule
30.Guice绑定和注入对象
31.初始化NodeClient

            client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}),
                    () -> clusterService.localNode().getId());

32.初始化rest处理器,这个非常重要,后面会专门讲解

if (NetworkModule.HTTP_ENABLED.get(settings)) {
                logger.debug("initializing HTTP handlers ..."); // 初始化http handler
                actionModule.initRestHandlers(() -> clusterService.state().nodes());
            }

33.修改状态为State.STARTED
34.启动pluginLifecycleComponents
35.通过 injector 获取各个类的对象,调用 start() 方法启动(实际进入各个类的中 doStart 方法): LifecycleComponent、IndicesService、IndicesClusterStateService、SnapshotsService、SnapshotShardsService、RoutingService、SearchService、MonitorService、NodeConnectionsService、ResourceWatcherService、GatewayService、Discovery、TransportService
36.启动HttpServerTransport和TransportService并绑定端口

if (WRITE_PORTS_FILE_SETTING.get(settings)) {
            if (NetworkModule.HTTP_ENABLED.get(settings)) {
                HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
                writePortsFile("http", http.boundAddress());
            }
            TransportService transport = injector.getInstance(TransportService.class);
            writePortsFile("transport", transport.boundAddress());
        }

总结

上一篇下一篇

猜你喜欢

热点阅读