opensearch元数据分析
OpenSearch存储数据的默认位置是data目录:
![](https://img.haomeiwen.com/i9503183/8422d3ce4c08f796.png)
存储的数据的种类
nodes/0下的就是节点存储的数据了, 分为以下几种:
- state元信息
- index lucene的索引和段文件等
- translog 事务日志
![](https://img.haomeiwen.com/i9503183/3f67158b5d3cfc79.png)
![](https://img.haomeiwen.com/i9503183/7515e7d883af3906.png)
state 元信息
opensearch fork自 elasticsearch7.10, 与它一样, 关于集群状态的元数据, 是写在了lucene的段文件里.
只有具备Master资格的节点和数据节点可以持久化集群状态.
-
GatewayMetaState
负责元数据的接收和持久化 -
GatewayService
负责元数据的恢复(在重启集群)
- _state/*.st : 集群层面的元信息: UUID, Settings, template, etc
- indices/{index_uuid}/_state/*.st: 索引层面的元信息
- indices/{index_uuid}/_state/0/_state/*.st: 分片层面的元信息
关于 st 后缀的文件, 可以下载一个16进制的查看器, 查看文件的内容:
http://hexfiend.com/
![](https://img.haomeiwen.com/i9503183/53ec2a8f9e4490ba.png)
Notice: 持久化的state不包括某个分片存在于哪个节点这种内容路由信息,集群完全重启时,依靠
gateway的recovery过程重建RoutingTable.
![](https://img.haomeiwen.com/i9503183/42956a3c8c37e547.png)
元数据的持久化与变更
元数据从哪里来: 1. 持久化存储 2. ClusterState
GatewayMetaState
的内部类: GatewayClusterApplier extents ClusterStateApplier
会将接受到的ClusterState的元信息持久化. applyClusterState
的统一调用入口在ClusterApplierService#callClusterStateAppliers
public void applyClusterState(ClusterChangedEvent event) {
if (event.state().blocks().disableStatePersistence()) {
incrementalClusterStateWriter.setIncrementalWrite(false);
return;
}
try {
// Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term
// that's higher than the last accepted term.
// TODO: can we get rid of this hack?
if (event.state().term() > incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm()) {
incrementalClusterStateWriter.setCurrentTerm(event.state().term());
}
incrementalClusterStateWriter.updateClusterState(event.state());
incrementalClusterStateWriter.setIncrementalWrite(true);
} catch (WriteStateException e) {
logger.warn("Exception occurred when storing new meta data", e);
}
}
org.opensearch.gateway.IncrementalClusterStateWriter#updateClusterState
会负责持久化元信息(实际的写入动作还是委托给了MetaStateService以及MetadataStateFormat
, 处理失败等等:
先写入全局状态long globalStateGeneration = writeGlobalState(writer, newMetadata);
然后写入索引状态: Map<Index, Long> indexGenerations = writeIndicesMetadata(writer, newState);
最后写入Manifest
: writeManifest(writer, manifest);
而Manifest
文件是持久化元信息的入口.
举例, 写入全局状态
long writeGlobalState(String reason, Metadata metadata) throws WriteStateException {
assert finished == false : FINISHED_MSG;
try { // 1. 设置回滚清理 2. 写入 3. 设置提交清理 两个清理的区别是: 1 会删除previousManifest之前的提交记录 3. 是提交成功的,会删除此次提交之前的记录, 包括previousManifest
rollbackCleanupActions.add(() -> metaStateService.cleanupGlobalState(previousManifest.getGlobalGeneration()));
long generation = metaStateService.writeGlobalState(reason, metadata);
commitCleanupActions.add(() -> metaStateService.cleanupGlobalState(generation));
return generation;
} catch (WriteStateException e) {
rollback();
throw e;
}
}
- 设置回滚清理
- 写入
- 设置提交清理 两个清理的区别是: 1 会删除previousManifest之前的提交记录 3. 是提交成功的,会删除此次提交之前的记录, 包括previousManifest.
所谓提交记录, 是提交点. _state
目录下的带有对应前缀的都是提交点, 数字后缀是版本. 比如node-130就是node的元信息, 版本130.
而底层执行读写的逻辑都被封装在了MetaDataStateFormat<T>
中, 主要由write
方法调用写(且写都遵循同样的逻辑 : 写临时文件 -> 刷盘 -> rename/move 原子操作 ), 不同的元数据作为子类, 需要实现抽象方法:
/**
* Writes the given state to the given XContentBuilder
* Subclasses need to implement this class for theirs specific state.
*/
public abstract void toXContent(XContentBuilder builder, T state) throws IOException;
/**
* Reads a new instance of the state from the given XContentParser
* Subclasses need to implement this class for theirs specific state.
*/
public abstract T fromXContent(XContentParser parser) throws IOException;
![](https://img.haomeiwen.com/i9503183/b677551b4c7bc0c8.png)
元数据的恢复
GatewayService负责元数据的恢复动作, 在集群状态发生变化的时候, 负责元数据的恢复. 调用点: ClusterApplierService在应用集群状态变更的时候. GaewayService
是一级的组件, 在 org.opensearch.gateway.GatewayService.doStart
组件启动的时候 ,将自己作为一个集群状态监听器注册到ClusterService
中, 这样, ClusterService
在ClusterState
发生变化的时候, 会调用ClusterApplierService
的 handleApplyCommit#onNewClusterState#applyChange
来触发监听器的调用. 在zen2模式下集群级别和索引级别的元数据在集群master选择出来后就已经有了, 而zen1(Gateway)是在更新索引级别的. 当索引级别的元数据也恢复以后, 就需要恢复shard级别的. 从始至终, 任务都在MasterSerivce的一个"masterService#updateTask"
的线程池里执行,这是个单线程的线程池, 也就是说,任务会阻塞执行.
index级别
GatewayService
依赖于:
-
AllocationService
shard分配服务, 元数据在cluster和index层面的恢复是由GatewayService操作的, 而shard级别的是由AllocationSerivce
, 所以完整的恢复需要依赖AllocationService
-
ClusterService
集群服务, 负责的是集群状态的协调, 状态变更的发生地,GatewayService
要感知到集群状态的变化, 必须依赖他 -
TransportNodesListGatewayMetaState
todo -
Discovery
用来判断 集群是用的zen1 还是 zen2(修改过的raft) -
ThreadPool
andSettings
GatewayService
初始化的时候, 读取元数据恢复的各类配置项.
GatewayService() {
this.expectedNodes = EXPECTED_NODES_SETTING.get(settings);
this.expectedDataNodes = EXPECTED_DATA_NODES_SETTING.get(settings);
this.expectedMasterNodes = EXPECTED_MASTER_NODES_SETTING.get(settings);
if (RECOVER_AFTER_TIME_SETTING.exists(settings)) {
recoverAfterTime = RECOVER_AFTER_TIME_SETTING.get(settings);
} else if (expectedNodes >= 0 || expectedDataNodes >= 0 || expectedMasterNodes >= 0) {
recoverAfterTime = DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET;
} else {
recoverAfterTime = null;
}
this.recoverAfterNodes = RECOVER_AFTER_NODES_SETTING.get(settings);
this.recoverAfterDataNodes = RECOVER_AFTER_DATA_NODES_SETTING.get(settings);
// default the recover after master nodes to the minimum master nodes in the discovery
if (RECOVER_AFTER_MASTER_NODES_SETTING.exists(settings)) {
recoverAfterMasterNodes = RECOVER_AFTER_MASTER_NODES_SETTING.get(settings);
} else if (discovery instanceof ZenDiscovery) {
recoverAfterMasterNodes = settings.getAsInt("discovery.zen.minimum_master_nodes", -1);
} else {
recoverAfterMasterNodes = -1;
}
}
- expectedNodes: "gateway.expected_nodes" : data和master-elig节点的个数, 要达到这个数据才会开始恢复
- expectedDataNodes: "gateway.expected_data_nodes"
- expectedMasterNodes: "gateway.expected_master_nodes"
- recoverAfterTime: "gateway.recover_after_time" : 如果没有达到预期的节点数量,则恢复过程将等待配置的时间,再尝试恢复。
- recoverAfterNodes: "gateway.recover_after_nodes" : 只要配置数量的节点(数据节点或具备Master资格的节点)加入集群就可以开始恢复
- recoverAfterDataNodes: "gateway.recover_after_data_nodes": 更精细化的配置
- recoverAfterMasterNodes: "gateway.recover_after_master_nodes" : 更精细化的配置
expected_nodes 和 recover_after_time 和 recover_after_nodes 是 或 的关系, 只要有一个先达到, 就执行.
STATE_NOT_RECOVERED_BLOCK : 阻塞态, 如果有, 则表示集群的元数据还没有恢复, 需要执行GatewayService
的恢复. 如果没有, 则已经恢复, 不用执行. 所以, 可以关注哪些地方会生成这个阻塞态:
Discovery
的子类zen
和Coordinator
的 doStart
方法, 即集群在选主的时候, 选主完成后(找到最大的term, 最大的version), 找到最新的ClusterState, 然后发布出去, 二阶段提交的第二阶段触发集群状态变化. 然后, 进入恢复流程.
根据选主算法的不同, 会采用不同的恢复流程:
if (discovery instanceof Coordinator) {
recoveryRunnable = () -> clusterService.submitStateUpdateTask("local-gateway-elected-state", new RecoverStateUpdateTask());
} else {
final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState);
recoveryRunnable = () -> gateway.performStateRecovery(new GatewayRecoveryListener());
}
即, 不同选主算法下, 元数据的恢复流程被封装成了一个 Runnable
recoveryRunnable, 在clusterChanged#performStateRecovery
的时候,触发调用. 出发前要做一系列的校验, 判断前置条件, 如:
执行时间, 节点个数 等
zen1
入口:
org.opensearch.gateway.Gateway#performStateRecovery
步骤:
-
final String[] nodesIds = clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);
获取具有Matser资格的节点列表 -
TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();
获取每个节点上的 IndexMetaData, 是同步阻塞调用.TransportNodesAction
-
final int requiredAllocation = Math.max(1, minimumMasterNodes);
zen1模式的缺陷之一: 必须要用户设置了过半的个数 - 从各个node拿回来的信息, 比较每个node里version 最大的ClusterState, 取版本最大的. 并获取所有 master-elig节点中存储的所有index信息.
- 最大版本号的
ClusterState
可以用来做除了index信息之外的其他信息的基础, index信息, 将通过第四步的获取结果重建. 对从各个节点拿回来的信息按照index分组, Index -> List[Node]. 然后找出每个index在各个node中最大版本的元数据作为此index的元信息. - 删除无用信息, 不合法的信息, 最终基于最大版本号的
ClusterState
(没有index信息)以及第五步中获取的信息构建新的ClusterState - 构建后处理: 调用成功和失败的后处理. 成功:
GatewayRecoveryListener
, 提交一个"local-gateway-elected-state", new RecoverStateUpdateTask()
的子任务,- 任务会混合当前的
ClusterState
和6得到的恢复了的ClusterState
- 并且重建阻塞态, Settings中设置的.
- 最后调用父类 RecoverStateUpdateTask 的execute方法, 从indices重建 routingTable , 取出
Not recovered
的阻塞态. -
启动shard的恢复:
allocationService.reroute(newState, "state recovered");
- 任务会混合当前的
zen2 Coordinator
clusterService.submitStateUpdateTask("local-gateway-elected-state", new RecoverStateUpdateTask()
与zen1 模式不同的是, Coordinator不需要再向各个master-eligible节点拉取indices的元信息以决定index 的最新元信息是什么, 因为利用了Raft的选主算法, 已经可以保证选完主之后发布的ClusterState
就是罪行的. 所以, 直接用他们构建routingTable即可, 然后触发 shard的恢复.
核心逻辑:
public ClusterState execute(final ClusterState currentState) {
if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
logger.debug("cluster is already recovered");
return currentState;
}
final ClusterState newState = Function.<ClusterState>identity()
.andThen(ClusterStateUpdaters::updateRoutingTable)
.andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock)
.apply(currentState);
return allocationService.reroute(newState, "state recovered");
}
shard级别
见其他章节