ElasticSearch 副分片提升

2019-01-05  本文已影响0人  persisting_

1 概述

ElasticSearch Master、Node故障检测说到,Master节点会定时Ping其他非主节点,如果其他的节点发生故障时,会在集群中移除该节点,如果该节点持有分片,那么可能会涉及到副分片提升,本文就介绍副分片如何提升(本文主要介绍Node故障检测之后的副分片提升,其他情形的副分片提升调用可以参考RoutingNodes.promoteReplicaToPrimary的调用轨迹,看其在哪些情景下会被触发)

2 节点故障后触发副分片的提升

2.1 节点故障造成节点移除

ZenDiscovery中主节点定时Ping其他非主节点的类为NodesFaultDetectionZenDiscovery会在构造函数中为NodesFaultDetection注册处理节点故障的Listener类NodeFaultDetectionListener,具体的请参考文章ElasticSearch Master、Node故障检测中的介绍,这里我们直接看如果主节点检测到节点故障会调用ZenDiscovery.handleNodeFailure

//ZenDiscovery
private void handleNodeFailure(final DiscoveryNode node, final String reason) {
    if (lifecycleState() != Lifecycle.State.STARTED) {
        // not started, ignore a node failure
        return;
    }
    if (!localNodeMaster()) {
        // nothing to do here...
        return;
    }
    //移除发生故障的节点
    removeNode(node, "zen-disco-node-failed", reason);
}

private void removeNode(final DiscoveryNode node, final String source, final String reason) {
    //向MasterService提交一个集群状态变更Task,
    //NodeRemovalClusterStateTaskExecutor便是副分片提升的关键
    //nodeRemovalExecutor就是NodeRemovalClusterStateTaskExecutor实例
    masterService.submitStateUpdateTask(
            source + "(" + node + "), reason(" + reason + ")",
            new NodeRemovalClusterStateTaskExecutor.Task(node, reason),
            ClusterStateTaskConfig.build(Priority.IMMEDIATE),
            nodeRemovalExecutor,
            nodeRemovalExecutor);
}

提交到MasterService中的集群状态变更任务会被执行,
NodeRemovalClusterStateTaskExecutorZenDiscovery的一个静态内部类。

2.2 NodeRemovalClusterStateTaskExecutor触发副分片提升

下面看NodeRemovalClusterStateTaskExecutor.execute方法,该方法中会触发副分片提升:

//ZenDiscovery.NodeRemovalClusterStateTaskExecutor
@Override
public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
    final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes());
    //记录是否有节点被移除,因为有可能节点已经被之前的任务移除了
    boolean removed = false;
    for (final Task task : tasks) {
        //如果当前集群中有Task.node记录的节点,那么移除该节点,并
        //置removed为true
        if (currentState.nodes().nodeExists(task.node())) {
            //移除故障节点,后续使用remainingNodesBuilder生成移除故障节点
            //之后的集群状态。
            remainingNodesBuilder.remove(task.node());
            removed = true;
        } else {
            logger.debug("node [{}] does not exist in cluster state, ignoring", task);
        }
    }
    //如果没有移除,则集群状态没有变化,直接返回当前集群状态
    if (!removed) {
        // no nodes to remove, keep the current cluster state
        return ClusterTasksResult.<Task>builder().successes(tasks).build(currentState);
    }

    //生成移除故障节点之后的状态
    final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder);

    final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
    //如果已经没有进行主节点选举的最小备选节点数目,则触发一次重新选举
    //选举之后会触发一次reroute进行分配,这里不进行介绍
    if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) {
        final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes());
        rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])",
                                                    masterNodes, electMasterService.minimumMasterNodes()));
        return resultBuilder.build(currentState);
    } else {
        //如果当前可当选为主节点的节点个数足够,则调用allocationService.deassociateDeadNodes
        //进行处理,该方法中会触发副分片提升操作
        return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));
    }
}

3 副分片提升流程

副分片提升是在AllocationService.deassociateDeadNodes方法中进行的:

//AllocationService
//下面的注释写的很清楚,将被移除节点上的所有的分片置为为分配状态,
//此过程中可能会触发副分片提升
/**
* unassigned an shards that are associated with nodes that are no longer part of the cluster, potentially promoting replicas
* if needed.
*/
public ClusterState deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
    RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
    // shuffle the unassigned nodes, just so we won't have things like poison failed shards
    //对未分配的Shard进行一次shuffle
    routingNodes.unassigned().shuffle();
    RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
        clusterInfoService.getClusterInfo(), currentNanoTime());

    // first, clear from the shards any node id they used to belong to that is now dead
    //对所有被移除节点上的shard进行处理
    deassociateDeadNodes(allocation);

    if (allocation.routingNodesChanged()) {
        clusterState = buildResult(clusterState, allocation);
    }
    //主节点移除非主节点时,reroute会传true,会触发一次reroute操作
    if (reroute) {
        return reroute(clusterState, reason);
    } else {
        return clusterState;
    }
}

下面我们重点介绍deassociateDeadNodes,该函数是故障处理或者说分片提升的关键:

//AllocationService
private void deassociateDeadNodes(RoutingAllocation allocation) {
    for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext(); ) {
        //当前待处理节点
        RoutingNode node = it.next();
        //如果当前节点列表还包含该节点,表示该节点未移除,
        //则不对分配在该节点上的shard进行处理
        if (allocation.nodes().getDataNodes().containsKey(node.nodeId())) {
            // its a live node, continue
            continue;
        }
        // now, go over all the shards routing on the node, and fail them
        //如果当前节点列表已经没有了该节点,则表示该节点被移除,所以对
        //分配在该节点上的所有shard进行处理
        for (ShardRouting shardRouting : node.copyShards()) {
            final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
            //index.unassigned.node_left.delayed_timeout配置当一个节点故障后
            //分配给该节点的Shard是否需要延迟分配
            boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
            UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]",
                null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT);
            allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes());
        }
        // its a dead node, remove it, note, its important to remove it *after* we apply failed shard
        // since it relies on the fact that the RoutingNode exists in the list of nodes
        it.remove();
    }
}

deassociateDeadNodes的函数定义可知,ES对分配在当前移除的故障节点上的所有Shard通过调用allocation.routingNodes().failShard进行处理:

//RoutingNodes
/**
* Applies the relevant logic to handle a cancelled or failed shard.
*
* Moves the shard to unassigned or completely removes the shard (if relocation target).
*
* - If shard is a primary, this also fails initializing replicas.
* - If shard is an active primary, this also promotes an active replica to primary (if such a replica exists).
* - If shard is a relocating primary, this also removes the primary relocation target shard.
* - If shard is a relocating replica, this promotes the replica relocation target to a full initializing replica, removing the
*   relocation source information. This is possible as peer recovery is always done from the primary.
* - If shard is a (primary or replica) relocation target, this also clears the relocation information on the source shard.
*
*/
public void failShard(Logger logger, ShardRouting failedShard, UnassignedInfo unassignedInfo, IndexMetaData indexMetaData,
                        RoutingChangesObserver routingChangesObserver) {
    ensureMutable();
    assert failedShard.assignedToNode() : "only assigned shards can be failed";
    assert indexMetaData.getIndex().equals(failedShard.index()) :
        "shard failed for unknown index (shard entry: " + failedShard + ")";
    assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()) == failedShard :
        "shard routing to fail does not exist in routing table, expected: " + failedShard + " but was: " +
            getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId());

    logger.debug("{} failing shard {} with unassigned info ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());

    // if this is a primary, fail initializing replicas first (otherwise we move RoutingNodes into an inconsistent state)
    //如果该Shard是主分片,则对该分片的所有INITIALIZING状态副分片,递归调用failShard进行处理
    if (failedShard.primary()) {
        //获取该分片的所有副分片,这其中包含主分片
        List<ShardRouting> assignedShards = assignedShards(failedShard.shardId());
        if (assignedShards.isEmpty() == false) {
            // copy list to prevent ConcurrentModificationException
            for (ShardRouting routing : new ArrayList<>(assignedShards)) {
                //对所有非主且为INITIALIZING状态的分片,递归调用failShard进行处理
                if (!routing.primary() && routing.initializing()) {
                    // re-resolve replica as earlier iteration could have changed source/target of replica relocation
                    ShardRouting replicaShard = getByAllocationId(routing.shardId(), routing.allocationId().getId());
                    assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas";
                    UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
                        "primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(),
                        unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT);
                    failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData, routingChangesObserver);
                }
            }
        }
    }
    //如果该分片正在relocate,进行如下处理,这里不介绍
    if (failedShard.relocating()) {
        // find the shard that is initializing on the target node
        ShardRouting targetShard = getByAllocationId(failedShard.shardId(), failedShard.allocationId().getRelocationId());
        assert targetShard.isRelocationTargetOf(failedShard);
        if (failedShard.primary()) {
            logger.trace("{} is removed due to the failure/cancellation of the source shard", targetShard);
            // cancel and remove target shard
            remove(targetShard);
            routingChangesObserver.shardFailed(targetShard, unassignedInfo);
        } else {
            logger.trace("{}, relocation source failed / cancelled, mark as initializing without relocation source", targetShard);
            // promote to initializing shard without relocation source and ensure that removed relocation source
            // is not added back as unassigned shard
            removeRelocationSource(targetShard);
            routingChangesObserver.relocationSourceRemoved(targetShard);
        }
    }

    // fail actual shard
    //如果该分片处于INITIALIZING状态
    if (failedShard.initializing()) {
        //不是进行relocate
        if (failedShard.relocatingNodeId() == null) {
            //如果该分片是主分片,即主分片发生故障,则尝试进行副分片提升
            if (failedShard.primary()) {
                // promote active replica to primary if active replica exists (only the case for shadow replicas)
                //获取具有最大版本号(最新)且为active状态的副分片
                ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
                //如果没有获取到active状态的副分片,则将该故障分片加入到未分配分片列表中记录
                if (activeReplica == null) {
                    moveToUnassigned(failedShard, unassignedInfo);
                } else {
                    //如果找到了具有最大版本号且为active状态的分片,则将主分片放到未分配分片列表中,并将其降级为副分片
                    //降级过程不再展开,就是从RoutingNodes已分配shard中移除该分片,并加入到未分配分片列表中
                    movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo);
                    //提升过程可以自行查看该函数具体实现,主要就是修改具有最大版本号的active分片为为主分片
                    promoteReplicaToPrimary(activeReplica, indexMetaData, routingChangesObserver);
                }
            } else {
                //如果该分片不是主分片,则直接将其移动到未分配分片列表中即可
                // initializing shard that is not relocation target, just move to unassigned
                moveToUnassigned(failedShard, unassignedInfo);
            }
        } else {
            //如果是relocate,则取消relocate过程
            // The shard is a target of a relocating shard. In that case we only need to remove the target shard and cancel the source
            // relocation. No shard is left unassigned
            logger.trace("{} is a relocation target, resolving source to cancel relocation ({})", failedShard,
                unassignedInfo.shortSummary());
            ShardRouting sourceShard = getByAllocationId(failedShard.shardId(),
                failedShard.allocationId().getRelocationId());
            assert sourceShard.isRelocationSourceOf(failedShard);
            logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", failedShard.shardId(), sourceShard,
                unassignedInfo.shortSummary());
            cancelRelocation(sourceShard);
            remove(failedShard);
        }
        routingChangesObserver.shardFailed(failedShard, unassignedInfo);
    } else {
        //如果该分片不是INITIALIZING状态,和上面过程类似,不再介绍
        assert failedShard.active();
        if (failedShard.primary()) {
            // promote active replica to primary if active replica exists
            ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
            if (activeReplica == null) {
                moveToUnassigned(failedShard, unassignedInfo);
            } else {
                movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo);
                promoteReplicaToPrimary(activeReplica, indexMetaData, routingChangesObserver);
            }
        } else {
            assert failedShard.primary() == false;
            if (failedShard.relocating()) {
                remove(failedShard);
            } else {
                moveToUnassigned(failedShard, unassignedInfo);
            }
        }
        routingChangesObserver.shardFailed(failedShard, unassignedInfo);
    }
    assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == null : "failedShard " + failedShard +
        " was matched but wasn't removed";
}

介绍了上述内容,现在回到deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason),该方法在经过了上述过程之后,会进行一次reroute操作,也就是重新进行一次分配,reroute方法会返回分配之后最新的ClusterState,具体过程会在后面有专文介绍。

再回过头看下ZenDiscovery.NodeRemovalClusterStateTaskExecutor.execute,该方法在上面已经列出源码,其中有段代码为

//ZenDiscovery.NodeRemovalClusterStateTaskExecutor.execute
return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));

这段代码就是返回移除节点操作(操作包括置分片为未分配状态、副分片提升、reroute等)之后的ClusterState,该状态会被publish到集群中其他节点进行状态同步。

4 总结

上述过程中有一些操作比如副分片提升没有展开具体介绍,因为过程比较简单,首先要知道RoutingNodes记录了分片、节点之间的分配关系,提升、移除等操作就是在RoutingNodes更新这些关系。更新完之后进行rerote,然后返回最新的ClusterStatepublish到集群中的所有节点进行集群状态同步。

上一篇下一篇

猜你喜欢

热点阅读