Elasticsearch源码分析-启动过程浅析
1、启动命令及启动类
首先,来看一下启动elasticsearch的Java命令,其中es.pidfile是pid文件路径,es.path.home是es的安装目录,logs、data、work和conf分别是存储日志、数据、工作和配置的目录。
${JAVA_HOME}/bin/java \
-Des.pidfile=/path/xxx.pid \
-Des.default.path.home=/path/xxx \
-Des.default.path.logs=/path/logs \
-Des.default.path.data=/path/data \
-Des.default.path.work=/path/work \
-Des.default.path.conf=/path/config \
-Des.path.home=/path/xxx \
-cp :/path/xxx.jar \
org.elasticsearch.bootstrap.Elasticsearch
elasticsearch启动类有两个,分别是Elasticsearch和ElasticsearchF,其中F代表foreground,区别是在前台进程运行还是后台进程运行,以及日志是存储在日志文件中还是显示在控制台中,System.setProperty("es.foreground", "yes")用来指定foreground。
两个启动类最终都是调用Bootstrap的静态main方法来启动elasticsearch。
public class Elasticsearch extends Bootstrap {
public static void close(String[] args) {
Bootstrap.close(args);
}
public static void main(String[] args) {
Bootstrap.main(args);
}
}
public class ElasticsearchF {
public static void close(String[] args) {
Bootstrap.close(args);
}
public static void main(String[] args) {
System.setProperty("es.foreground", "yes");
Bootstrap.main(args);
}
}
2、环境初始化
在Bootstrap的main中,首先根据es.pidfile或者es-pidfile获取pid文件路径,并将运行当前elasticsearch的jvm进程号写入pid文件中,并调用fPidFile.deleteOnExit()在jvm进程结束时删除pid文件。
public class Bootstrap {
public static void main(String[] args) {
// pid文件路径, 启动参数 -Des.pidfile=/opt/elasticsearch-1.6.0/run/elasticsearch.pid
final String pidFile = System.getProperty("es.pidfile", System.getProperty("es-pidfile"));
if (pidFile != null) {
try {
File fPidFile = new File(pidFile);
// 将jvm进程号写入fPidFile文件
// ...
// 当虚拟机terminate时,删除pid文件
fPidFile.deleteOnExit();
} catch (Exception e) {
// ...
}
}
}
}
接下来使用initialSettings()加载环境变量和配置文件,主要的逻辑在InternalSettingsPreparer.prepareSettings()中
public class Bootstrap {
public static void main(String[] args) {
// ...
Tuple<Settings, Environment> tuple = null;
try {
tuple = initialSettings(foreground);
setupLogging(tuple);
}catch (Exception e) {
// ...
}
// ...
}
}
大致的流程如下:
①首先从System.getProperties()加载前缀为elasticsearch.和es.的变量
public class InternalSettingsPreparer {
public static Tuple<Settings, Environment> prepareSettings(Settings pSettings, boolean loadConfigSettings, Terminal terminal) {
// ...
// just create enough settings to build the environment
ImmutableSettings.Builder settingsBuilder = settingsBuilder().put(pSettings);
if (useSystemProperties) { // 优先加载default系统属性
settingsBuilder.putProperties("elasticsearch.default.", System.getProperties())
.putProperties("es.default.", System.getProperties())
.putProperties("elasticsearch.", System.getProperties(), ignorePrefixes) // 加载相同前缀的系统属性,但忽略es.default.和elasticsearch.default.前缀
.putProperties("es.", System.getProperties(), ignorePrefixes);
}
settingsBuilder.replacePropertyPlaceholders();
// 获取环境变量,包括path.home=home、path.data=data、path.logs=logs、path.conf=config和path.work=work
Environment environment = new Environment(settingsBuilder.build()); //如果path.conf为空,则为path.home(default:user.dir)/config
// ...
}
}
②然后初始化Environment,主要是设置elasticsearch的path.home、path.conf、path.plugins、path.work、path.data、path.repo和path.logs变量,如果path.home没有设置,则置为
System.getProperty("user.dir");如果其他变量为空,则为path.home下面对应的目录(conf为{path.home}/config,data为{path.home}/data/集群名)
public class Environment {
public Environment(Settings settings) {
this.settings = settings;
if (settings.get("path.home") != null) {
homeFile = new File(cleanPath(settings.get("path.home")));
} else {
homeFile = new File(System.getProperty("user.dir"));
}
if (settings.get("path.conf") != null) {
configFile = new File(cleanPath(settings.get("path.conf")));
} else {
configFile = new File(homeFile, "config");
}
// ......
}
}
③按顺序加载es.default.config、es.config和elasticsearch.config变量对应的配置文件,若为空,则忽略。当加载的配置文件不是es.config和elasticsearch.config变量对应的配置文件时,则继续加载{path.home}/config/elasticsearch.yml, .yaml, .json, .properties配置文件
public class InternalSettingsPreparer {
public static Tuple<Settings, Environment> prepareSettings(Settings pSettings, boolean loadConfigSettings, Terminal terminal) {
// ...
if (loadConfigSettings) {
boolean loadFromEnv = true;
if (useSystemProperties) {// 默认为true
// if its default, then load it, but also load form env
if (Strings.hasText(System.getProperty("es.default.config"))) { // 从系统属性中加载默认config配置
loadFromEnv = true;
settingsBuilder.loadFromUrl(environment.resolveConfig(System.getProperty("es.default.config")));
}
// ...
}
// 从es.default.config加载配置后需要从.yml, .yaml, .json, .properties中继续加载配置
if (loadFromEnv) {
for (String allowedSuffix : ALLOWED_SUFFIXES) {
try {
// config目录下的elasticsearch.yml文件
settingsBuilder.loadFromUrl(environment.resolveConfig("elasticsearch" + allowedSuffix));
} catch (FailedToResolveConfigException e) {
// ignore
}
}
}
}
// ...
}
}
④使用相同前缀的系统属性覆盖已设置的前缀为es.和elasticsearch.变量,并忽略前缀为es.default., elasticsearch.default.的变量
public class InternalSettingsPreparer {
public static Tuple<Settings, Environment> prepareSettings(Settings pSettings, boolean loadConfigSettings, Terminal terminal) {
// ...
settingsBuilder.put(pSettings);
// 除了es.default., elasticsearch.default. ,使用相同前缀的系统属性覆盖settingsBuilder
if (useSystemProperties) {
settingsBuilder.putProperties("elasticsearch.", System.getProperties(), ignorePrefixes)
.putProperties("es.", System.getProperties(), ignorePrefixes);
}
settingsBuilder.replacePropertyPlaceholders();
}
}
⑤如果配置文件中没有设置name,则从系统属性中读取,如果不为空则为节点名,若依然为空,则从config/names.txt随机选择一个字符串作为节点名
public class InternalSettingsPreparer {
public static Tuple<Settings, Environment> prepareSettings(Settings pSettings, boolean loadConfigSettings, Terminal terminal) {
// ...
if (settingsBuilder.get("name") == null) {
String name = System.getProperty("name");
if (name != null) {
settingsBuilder.put("name", name);
}
}
Settings settings = replacePromptPlaceholders(settingsBuilder.build(), terminal);
if (settings.get("name") == null) {
final String name = settings.get("node.name");
if (name == null || name.isEmpty()) {
settings = settingsBuilder().put(settings)
.put("name", Names.randomNodeName(environment.resolveConfig("names.txt")))
.build();
} else {
settings = settingsBuilder().put(settings)
.put("name", name)
.build();
}
}
// ...
}
}
⑥如果没有设置集群名变量cluster.name,则设置为默认值"elasticsearch"
public class InternalSettingsPreparer {
public static Tuple<Settings, Environment> prepareSettings(Settings pSettings, boolean loadConfigSettings, Terminal terminal) {
// ...
if (settingsBuilder.get(ClusterName.SETTING) == null) {
settingsBuilder.put(ClusterName.SETTING, ClusterName.DEFAULT.value());
}
// ...
}
}
3、集群设置及启动
设置完环境变量,开始使用bootstrap.setup(true, tuple)进行集群的初始化,并使用bootstrap.start()启动集群
public class Bootstrap {
public static void main(String[] args) {
// ...
try {
if (!foreground) {
Loggers.disableConsoleLogging();
System.out.close();
}
// fail if using broken version
JVMCheck.check();
bootstrap.setup(true, tuple);
stage = "Startup";
bootstrap.start();
} catch (Throwable e) {
}
// ...
}
}
在初始化集群前,进行jvm检验,出现以下二者之一情况将会抛异常并终止启动过程:
①JVM供应商为IBM Corporation
②JVM供应商为Oracle Corporation,版本为21.0-b17、24.0-b56、24.45-b08和24.51-b03,且运行时没有加对应的-XX:-UseLoopPredicate或者-XX:-UseSuperWord参数
public class JVMCheck {
static final Map<String,HotspotBug> JVM_BROKEN_HOTSPOT_VERSIONS;
static {
Map<String,HotspotBug> bugs = new HashMap<>();
// 1.7.0: loop optimizer bug
bugs.put("21.0-b17", new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-7070134", "-XX:-UseLoopPredicate"));
// register allocation issues (technically only x86/amd64). This impacted update 40, 45, and 51
bugs.put("24.0-b56", new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-8024830", "-XX:-UseSuperWord"));
bugs.put("24.45-b08", new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-8024830", "-XX:-UseSuperWord"));
bugs.put("24.51-b03", new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-8024830", "-XX:-UseSuperWord"));
JVM_BROKEN_HOTSPOT_VERSIONS = Collections.unmodifiableMap(bugs);
}
static void check() {
if (Boolean.parseBoolean(System.getProperty(JVM_BYPASS))) {
// ...
} else if ("Oracle Corporation".equals(Constants.JVM_VENDOR)) {
HotspotBug bug = JVM_BROKEN_HOTSPOT_VERSIONS.get(Constants.JVM_VERSION);
if (bug != null) { // wordAround为-XX:-UseLoopPredicate或者-XX:-UseSuperWord
if (bug.workAround != null && ManagementFactory.getRuntimeMXBean().getInputArguments().contains(bug.workAround)) {
Loggers.getLogger(JVMCheck.class).warn(bug.getWarningMessage());
} else {
throw new RuntimeException(bug.getErrorMessage());
}
}
} else if ("IBM Corporation".equals(Constants.JVM_VENDOR)) {
// currently any JVM from IBM will easily result in index corruption.
// ...
throw new RuntimeException(sb.toString());
}
}
}
若JVM可用,则执行bootstrap.setup()进入集群初始化阶段,主要使用InternalNode类的构造方法创建节点对象node。
最后添加钩子方法,在虚拟机结束前执行node.close()关闭node
public class Bootstrap {
private void setup(boolean addShutdownHook, Tuple<Settings, Environment> tuple) throws Exception {
// ...
Settings nodeSettings = ImmutableSettings.settingsBuilder()
.put(settings)
.put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING, true)
.build();
NodeBuilder nodeBuilder = NodeBuilder.nodeBuilder().settings(nodeSettings).loadConfigSettings(false);
node = nodeBuilder.build();
if (addShutdownHook) {
Runtime.getRuntime().addShutdownHook(new Thread() { //虚拟机关闭前执行
@Override
public void run() {
node.close();
}
});
}
}
}
构造node对象时,主要流程是创建nodeEnvironment,并执行modules.add()方法添加elasticsearch各部分模块,在添加模块时,会执行模块对应的module.spawnModules(),最后创建注入对象,执行每个模块的configure()方法,将实现和接口进行绑定。
public final class InternalNode implements Node {
public InternalNode(Settings preparedSettings, boolean loadConfigSettings) throws ElasticsearchException {
// ...
final NodeEnvironment nodeEnvironment;
try {
nodeEnvironment = new NodeEnvironment(this.settings, this.environment);
} catch (IOException ex) {
throw new ElasticsearchIllegalStateException("Failed to created node environment", ex);
}
boolean success = false;
try {
ModulesBuilder modules = new ModulesBuilder();
modules.add(new NodeEnvironmentModule(nodeEnvironment));
modules.add(new DiscoveryModule(settings));
//...
// 创建injector完成注入
injector = modules.createInjector();
//获取Client的绑定实现
client = injector.getInstance(Client.class);
threadPool.setNodeSettingsService(injector.getInstance(NodeSettingsService.class));
success = true;
}finally {
if (!success) {
nodeEnvironment.close();
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
logger.info("initialized");
}
}
对Client类进行绑定
public class NodeClientModule extends AbstractModule {
@Override
protected void configure() {
// ...
bind(Client.class).to(NodeClient.class).asEagerSingleton();
}
}
以DiscoveryModule模块为例,用node.local和node.mode参数控制是本地发现还是Zen发现,然后初始化对应Discovery模块
public class DiscoveryModule extends AbstractModule implements SpawnModules {
@Override
public Iterable<? extends Module> spawnModules() {
Class<? extends Module> defaultDiscoveryModule;
if (DiscoveryNode.localNode(settings)) {
defaultDiscoveryModule = LocalDiscoveryModule.class;
} else {
defaultDiscoveryModule = ZenDiscoveryModule.class;
}
return ImmutableList.of(Modules.createModule(settings.getAsClass(DISCOVERY_TYPE_KEY, defaultDiscoveryModule, "org.elasticsearch.discovery.", "DiscoveryModule"), settings));
}
@Override
protected void configure() {
bind(DiscoveryService.class).asEagerSingleton();
}
}
节点发现的两种方式Local和Network,分别对应LocalDiscovery和ZenDiscovery
public class DiscoveryNode implements Streamable, Serializable {
public static boolean localNode(Settings settings) {
if (settings.get("node.local") != null) {
return settings.getAsBoolean("node.local", false);
}
if (settings.get("node.mode") != null) {
String nodeMode = settings.get("node.mode");
if ("local".equals(nodeMode)) {
return true;
} else if ("network".equals(nodeMode)) {
return false;
} else {
throw new ElasticsearchIllegalArgumentException("unsupported node.mode [" + nodeMode + "]. Should be one of [local, network].");
}
}
return false;
}
}
在完成节点初始化后,调用bootstrap.start()来启动节点,其实是调用的node.start(),与钩子函数的node.close()相对应
启动节点的过程,其实是各个模块的启动过程,调用各模块的start方法
public final class InternalNode implements Node {
public Node start() {
if (!lifecycle.moveToStarted()) {
return this;
}
// 开启Tcp服务
injector.getInstance(TransportService.class).start();
// 节点发现及master选举
DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start(); // ZenDiscovery.doStart
discoService.waitForInitialState();
// 应该在DiscoveryService启动之后,开启网关服务
injector.getInstance(GatewayService.class).start();
// 开启Http服务
if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).start();
}
}
}
4、设置keepAliveThread
node完成启动后,创建用户线程keepAliveThread,值为1,并添加一个钩子方法,在JVM关闭前执行countDown()。
然后继续创建用户线程keepAliveThread,在keepAliveLatch执行countDown()之前一直阻塞,以此来保证elasticsearch一直存活
public class Bootstrap {
public static void main(String[] args) {
keepAliveLatch = new CountDownLatch(1);
// keep this thread alive (non daemon thread) until we shutdown
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
keepAliveLatch.countDown();
}
});
keepAliveThread = new Thread(new Runnable() {
@Override
public void run() {
try {
keepAliveLatch.await();
} catch (InterruptedException e) {
// bail out
}
}
}, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
keepAliveThread.setDaemon(false);
keepAliveThread.start();
}
}
elasticsearch在启动过程中会注入TransportModule和HttpServerModule模块,并且在启动时会启动TransportService和HttpServer,最终都是通过Netty监听Http和Tcp消息,完成客户端请求处理。