ElasticSearch 副分片提升
1 概述
在ElasticSearch Master、Node故障检测说到,Master节点会定时Ping其他非主节点,如果其他的节点发生故障时,会在集群中移除该节点,如果该节点持有分片,那么可能会涉及到副分片提升,本文就介绍副分片如何提升(本文主要介绍Node故障检测之后的副分片提升,其他情形的副分片提升调用可以参考RoutingNodes.promoteReplicaToPrimary
的调用轨迹,看其在哪些情景下会被触发)
2 节点故障后触发副分片的提升
2.1 节点故障造成节点移除
ZenDiscovery
中主节点定时Ping其他非主节点的类为NodesFaultDetection
,ZenDiscovery
会在构造函数中为NodesFaultDetection
注册处理节点故障的Listener类NodeFaultDetectionListener
,具体的请参考文章ElasticSearch Master、Node故障检测中的介绍,这里我们直接看如果主节点检测到节点故障会调用ZenDiscovery.handleNodeFailure
//ZenDiscovery
private void handleNodeFailure(final DiscoveryNode node, final String reason) {
if (lifecycleState() != Lifecycle.State.STARTED) {
// not started, ignore a node failure
return;
}
if (!localNodeMaster()) {
// nothing to do here...
return;
}
//移除发生故障的节点
removeNode(node, "zen-disco-node-failed", reason);
}
private void removeNode(final DiscoveryNode node, final String source, final String reason) {
//向MasterService提交一个集群状态变更Task,
//NodeRemovalClusterStateTaskExecutor便是副分片提升的关键
//nodeRemovalExecutor就是NodeRemovalClusterStateTaskExecutor实例
masterService.submitStateUpdateTask(
source + "(" + node + "), reason(" + reason + ")",
new NodeRemovalClusterStateTaskExecutor.Task(node, reason),
ClusterStateTaskConfig.build(Priority.IMMEDIATE),
nodeRemovalExecutor,
nodeRemovalExecutor);
}
提交到MasterService
中的集群状态变更任务会被执行,
NodeRemovalClusterStateTaskExecutor
是ZenDiscovery
的一个静态内部类。
2.2 NodeRemovalClusterStateTaskExecutor
触发副分片提升
下面看NodeRemovalClusterStateTaskExecutor.execute
方法,该方法中会触发副分片提升:
//ZenDiscovery.NodeRemovalClusterStateTaskExecutor
@Override
public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes());
//记录是否有节点被移除,因为有可能节点已经被之前的任务移除了
boolean removed = false;
for (final Task task : tasks) {
//如果当前集群中有Task.node记录的节点,那么移除该节点,并
//置removed为true
if (currentState.nodes().nodeExists(task.node())) {
//移除故障节点,后续使用remainingNodesBuilder生成移除故障节点
//之后的集群状态。
remainingNodesBuilder.remove(task.node());
removed = true;
} else {
logger.debug("node [{}] does not exist in cluster state, ignoring", task);
}
}
//如果没有移除,则集群状态没有变化,直接返回当前集群状态
if (!removed) {
// no nodes to remove, keep the current cluster state
return ClusterTasksResult.<Task>builder().successes(tasks).build(currentState);
}
//生成移除故障节点之后的状态
final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder);
final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
//如果已经没有进行主节点选举的最小备选节点数目,则触发一次重新选举
//选举之后会触发一次reroute进行分配,这里不进行介绍
if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) {
final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes());
rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])",
masterNodes, electMasterService.minimumMasterNodes()));
return resultBuilder.build(currentState);
} else {
//如果当前可当选为主节点的节点个数足够,则调用allocationService.deassociateDeadNodes
//进行处理,该方法中会触发副分片提升操作
return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));
}
}
3 副分片提升流程
副分片提升是在AllocationService.deassociateDeadNodes
方法中进行的:
//AllocationService
//下面的注释写的很清楚,将被移除节点上的所有的分片置为为分配状态,
//此过程中可能会触发副分片提升
/**
* unassigned an shards that are associated with nodes that are no longer part of the cluster, potentially promoting replicas
* if needed.
*/
public ClusterState deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
//对未分配的Shard进行一次shuffle
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
clusterInfoService.getClusterInfo(), currentNanoTime());
// first, clear from the shards any node id they used to belong to that is now dead
//对所有被移除节点上的shard进行处理
deassociateDeadNodes(allocation);
if (allocation.routingNodesChanged()) {
clusterState = buildResult(clusterState, allocation);
}
//主节点移除非主节点时,reroute会传true,会触发一次reroute操作
if (reroute) {
return reroute(clusterState, reason);
} else {
return clusterState;
}
}
下面我们重点介绍deassociateDeadNodes
,该函数是故障处理或者说分片提升的关键:
//AllocationService
private void deassociateDeadNodes(RoutingAllocation allocation) {
for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext(); ) {
//当前待处理节点
RoutingNode node = it.next();
//如果当前节点列表还包含该节点,表示该节点未移除,
//则不对分配在该节点上的shard进行处理
if (allocation.nodes().getDataNodes().containsKey(node.nodeId())) {
// its a live node, continue
continue;
}
// now, go over all the shards routing on the node, and fail them
//如果当前节点列表已经没有了该节点,则表示该节点被移除,所以对
//分配在该节点上的所有shard进行处理
for (ShardRouting shardRouting : node.copyShards()) {
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
//index.unassigned.node_left.delayed_timeout配置当一个节点故障后
//分配给该节点的Shard是否需要延迟分配
boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT);
allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes());
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
// since it relies on the fact that the RoutingNode exists in the list of nodes
it.remove();
}
}
从deassociateDeadNodes
的函数定义可知,ES对分配在当前移除的故障节点上的所有Shard通过调用allocation.routingNodes().failShard
进行处理:
//RoutingNodes
/**
* Applies the relevant logic to handle a cancelled or failed shard.
*
* Moves the shard to unassigned or completely removes the shard (if relocation target).
*
* - If shard is a primary, this also fails initializing replicas.
* - If shard is an active primary, this also promotes an active replica to primary (if such a replica exists).
* - If shard is a relocating primary, this also removes the primary relocation target shard.
* - If shard is a relocating replica, this promotes the replica relocation target to a full initializing replica, removing the
* relocation source information. This is possible as peer recovery is always done from the primary.
* - If shard is a (primary or replica) relocation target, this also clears the relocation information on the source shard.
*
*/
public void failShard(Logger logger, ShardRouting failedShard, UnassignedInfo unassignedInfo, IndexMetaData indexMetaData,
RoutingChangesObserver routingChangesObserver) {
ensureMutable();
assert failedShard.assignedToNode() : "only assigned shards can be failed";
assert indexMetaData.getIndex().equals(failedShard.index()) :
"shard failed for unknown index (shard entry: " + failedShard + ")";
assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()) == failedShard :
"shard routing to fail does not exist in routing table, expected: " + failedShard + " but was: " +
getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId());
logger.debug("{} failing shard {} with unassigned info ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());
// if this is a primary, fail initializing replicas first (otherwise we move RoutingNodes into an inconsistent state)
//如果该Shard是主分片,则对该分片的所有INITIALIZING状态副分片,递归调用failShard进行处理
if (failedShard.primary()) {
//获取该分片的所有副分片,这其中包含主分片
List<ShardRouting> assignedShards = assignedShards(failedShard.shardId());
if (assignedShards.isEmpty() == false) {
// copy list to prevent ConcurrentModificationException
for (ShardRouting routing : new ArrayList<>(assignedShards)) {
//对所有非主且为INITIALIZING状态的分片,递归调用failShard进行处理
if (!routing.primary() && routing.initializing()) {
// re-resolve replica as earlier iteration could have changed source/target of replica relocation
ShardRouting replicaShard = getByAllocationId(routing.shardId(), routing.allocationId().getId());
assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas";
UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
"primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT);
failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData, routingChangesObserver);
}
}
}
}
//如果该分片正在relocate,进行如下处理,这里不介绍
if (failedShard.relocating()) {
// find the shard that is initializing on the target node
ShardRouting targetShard = getByAllocationId(failedShard.shardId(), failedShard.allocationId().getRelocationId());
assert targetShard.isRelocationTargetOf(failedShard);
if (failedShard.primary()) {
logger.trace("{} is removed due to the failure/cancellation of the source shard", targetShard);
// cancel and remove target shard
remove(targetShard);
routingChangesObserver.shardFailed(targetShard, unassignedInfo);
} else {
logger.trace("{}, relocation source failed / cancelled, mark as initializing without relocation source", targetShard);
// promote to initializing shard without relocation source and ensure that removed relocation source
// is not added back as unassigned shard
removeRelocationSource(targetShard);
routingChangesObserver.relocationSourceRemoved(targetShard);
}
}
// fail actual shard
//如果该分片处于INITIALIZING状态
if (failedShard.initializing()) {
//不是进行relocate
if (failedShard.relocatingNodeId() == null) {
//如果该分片是主分片,即主分片发生故障,则尝试进行副分片提升
if (failedShard.primary()) {
// promote active replica to primary if active replica exists (only the case for shadow replicas)
//获取具有最大版本号(最新)且为active状态的副分片
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
//如果没有获取到active状态的副分片,则将该故障分片加入到未分配分片列表中记录
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
//如果找到了具有最大版本号且为active状态的分片,则将主分片放到未分配分片列表中,并将其降级为副分片
//降级过程不再展开,就是从RoutingNodes已分配shard中移除该分片,并加入到未分配分片列表中
movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo);
//提升过程可以自行查看该函数具体实现,主要就是修改具有最大版本号的active分片为为主分片
promoteReplicaToPrimary(activeReplica, indexMetaData, routingChangesObserver);
}
} else {
//如果该分片不是主分片,则直接将其移动到未分配分片列表中即可
// initializing shard that is not relocation target, just move to unassigned
moveToUnassigned(failedShard, unassignedInfo);
}
} else {
//如果是relocate,则取消relocate过程
// The shard is a target of a relocating shard. In that case we only need to remove the target shard and cancel the source
// relocation. No shard is left unassigned
logger.trace("{} is a relocation target, resolving source to cancel relocation ({})", failedShard,
unassignedInfo.shortSummary());
ShardRouting sourceShard = getByAllocationId(failedShard.shardId(),
failedShard.allocationId().getRelocationId());
assert sourceShard.isRelocationSourceOf(failedShard);
logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", failedShard.shardId(), sourceShard,
unassignedInfo.shortSummary());
cancelRelocation(sourceShard);
remove(failedShard);
}
routingChangesObserver.shardFailed(failedShard, unassignedInfo);
} else {
//如果该分片不是INITIALIZING状态,和上面过程类似,不再介绍
assert failedShard.active();
if (failedShard.primary()) {
// promote active replica to primary if active replica exists
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo);
promoteReplicaToPrimary(activeReplica, indexMetaData, routingChangesObserver);
}
} else {
assert failedShard.primary() == false;
if (failedShard.relocating()) {
remove(failedShard);
} else {
moveToUnassigned(failedShard, unassignedInfo);
}
}
routingChangesObserver.shardFailed(failedShard, unassignedInfo);
}
assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == null : "failedShard " + failedShard +
" was matched but wasn't removed";
}
介绍了上述内容,现在回到deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason)
,该方法在经过了上述过程之后,会进行一次reroute
操作,也就是重新进行一次分配,reroute
方法会返回分配之后最新的ClusterState
,具体过程会在后面有专文介绍。
再回过头看下ZenDiscovery.NodeRemovalClusterStateTaskExecutor.execute
,该方法在上面已经列出源码,其中有段代码为
//ZenDiscovery.NodeRemovalClusterStateTaskExecutor.execute
return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));
这段代码就是返回移除节点操作(操作包括置分片为未分配状态、副分片提升、reroute等)之后的ClusterState
,该状态会被publish到集群中其他节点进行状态同步。
4 总结
上述过程中有一些操作比如副分片提升没有展开具体介绍,因为过程比较简单,首先要知道RoutingNodes
记录了分片、节点之间的分配关系,提升、移除等操作就是在RoutingNodes
更新这些关系。更新完之后进行rerote,然后返回最新的ClusterState
publish到集群中的所有节点进行集群状态同步。