ElasticSearch 副分片提升

2019-01-05  本文已影响0人  persisting_

1 概述

ElasticSearch Master、Node故障检测说到,Master节点会定时Ping其他非主节点,如果其他的节点发生故障时,会在集群中移除该节点,如果该节点持有分片,那么可能会涉及到副分片提升,本文就介绍副分片如何提升(本文主要介绍Node故障检测之后的副分片提升,其他情形的副分片提升调用可以参考RoutingNodes.promoteReplicaToPrimary的调用轨迹,看其在哪些情景下会被触发)

2 节点故障后触发副分片的提升

2.1 节点故障造成节点移除

ZenDiscovery中主节点定时Ping其他非主节点的类为NodesFaultDetectionZenDiscovery会在构造函数中为NodesFaultDetection注册处理节点故障的Listener类NodeFaultDetectionListener,具体的请参考文章ElasticSearch Master、Node故障检测中的介绍,这里我们直接看如果主节点检测到节点故障会调用ZenDiscovery.handleNodeFailure

private void handleNodeFailure(final DiscoveryNode node, final String reason) {
    if (lifecycleState() != Lifecycle.State.STARTED) {
        // not started, ignore a node failure
    if (!localNodeMaster()) {
        // nothing to do here...
    removeNode(node, "zen-disco-node-failed", reason);

private void removeNode(final DiscoveryNode node, final String source, final String reason) {
            source + "(" + node + "), reason(" + reason + ")",
            new NodeRemovalClusterStateTaskExecutor.Task(node, reason),


2.2 NodeRemovalClusterStateTaskExecutor触发副分片提升


public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
    final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes());
    boolean removed = false;
    for (final Task task : tasks) {
        if (currentState.nodes().nodeExists(task.node())) {
            removed = true;
        } else {
            logger.debug("node [{}] does not exist in cluster state, ignoring", task);
    if (!removed) {
        // no nodes to remove, keep the current cluster state
        return ClusterTasksResult.<Task>builder().successes(tasks).build(currentState);

    final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder);

    final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
    if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) {
        final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes());
        rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])",
                                                    masterNodes, electMasterService.minimumMasterNodes()));
        return resultBuilder.build(currentState);
    } else {
        return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));

3 副分片提升流程


* unassigned an shards that are associated with nodes that are no longer part of the cluster, potentially promoting replicas
* if needed.
public ClusterState deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason) {
    RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
    // shuffle the unassigned nodes, just so we won't have things like poison failed shards
    RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
        clusterInfoService.getClusterInfo(), currentNanoTime());

    // first, clear from the shards any node id they used to belong to that is now dead

    if (allocation.routingNodesChanged()) {
        clusterState = buildResult(clusterState, allocation);
    if (reroute) {
        return reroute(clusterState, reason);
    } else {
        return clusterState;


private void deassociateDeadNodes(RoutingAllocation allocation) {
    for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext(); ) {
        RoutingNode node = it.next();
        if (allocation.nodes().getDataNodes().containsKey(node.nodeId())) {
            // its a live node, continue
        // now, go over all the shards routing on the node, and fail them
        for (ShardRouting shardRouting : node.copyShards()) {
            final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
            boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
            UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]",
                null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT);
            allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes());
        // its a dead node, remove it, note, its important to remove it *after* we apply failed shard
        // since it relies on the fact that the RoutingNode exists in the list of nodes


* Applies the relevant logic to handle a cancelled or failed shard.
* Moves the shard to unassigned or completely removes the shard (if relocation target).
* - If shard is a primary, this also fails initializing replicas.
* - If shard is an active primary, this also promotes an active replica to primary (if such a replica exists).
* - If shard is a relocating primary, this also removes the primary relocation target shard.
* - If shard is a relocating replica, this promotes the replica relocation target to a full initializing replica, removing the
*   relocation source information. This is possible as peer recovery is always done from the primary.
* - If shard is a (primary or replica) relocation target, this also clears the relocation information on the source shard.
public void failShard(Logger logger, ShardRouting failedShard, UnassignedInfo unassignedInfo, IndexMetaData indexMetaData,
                        RoutingChangesObserver routingChangesObserver) {
    assert failedShard.assignedToNode() : "only assigned shards can be failed";
    assert indexMetaData.getIndex().equals(failedShard.index()) :
        "shard failed for unknown index (shard entry: " + failedShard + ")";
    assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId()) == failedShard :
        "shard routing to fail does not exist in routing table, expected: " + failedShard + " but was: " +
            getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId());

    logger.debug("{} failing shard {} with unassigned info ({})", failedShard.shardId(), failedShard, unassignedInfo.shortSummary());

    // if this is a primary, fail initializing replicas first (otherwise we move RoutingNodes into an inconsistent state)
    if (failedShard.primary()) {
        List<ShardRouting> assignedShards = assignedShards(failedShard.shardId());
        if (assignedShards.isEmpty() == false) {
            // copy list to prevent ConcurrentModificationException
            for (ShardRouting routing : new ArrayList<>(assignedShards)) {
                if (!routing.primary() && routing.initializing()) {
                    // re-resolve replica as earlier iteration could have changed source/target of replica relocation
                    ShardRouting replicaShard = getByAllocationId(routing.shardId(), routing.allocationId().getId());
                    assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas";
                    UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
                        "primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(),
                        unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT);
                    failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData, routingChangesObserver);
    if (failedShard.relocating()) {
        // find the shard that is initializing on the target node
        ShardRouting targetShard = getByAllocationId(failedShard.shardId(), failedShard.allocationId().getRelocationId());
        assert targetShard.isRelocationTargetOf(failedShard);
        if (failedShard.primary()) {
            logger.trace("{} is removed due to the failure/cancellation of the source shard", targetShard);
            // cancel and remove target shard
            routingChangesObserver.shardFailed(targetShard, unassignedInfo);
        } else {
            logger.trace("{}, relocation source failed / cancelled, mark as initializing without relocation source", targetShard);
            // promote to initializing shard without relocation source and ensure that removed relocation source
            // is not added back as unassigned shard

    // fail actual shard
    if (failedShard.initializing()) {
        if (failedShard.relocatingNodeId() == null) {
            if (failedShard.primary()) {
                // promote active replica to primary if active replica exists (only the case for shadow replicas)
                ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
                if (activeReplica == null) {
                    moveToUnassigned(failedShard, unassignedInfo);
                } else {
                    movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo);
                    promoteReplicaToPrimary(activeReplica, indexMetaData, routingChangesObserver);
            } else {
                // initializing shard that is not relocation target, just move to unassigned
                moveToUnassigned(failedShard, unassignedInfo);
        } else {
            // The shard is a target of a relocating shard. In that case we only need to remove the target shard and cancel the source
            // relocation. No shard is left unassigned
            logger.trace("{} is a relocation target, resolving source to cancel relocation ({})", failedShard,
            ShardRouting sourceShard = getByAllocationId(failedShard.shardId(),
            assert sourceShard.isRelocationSourceOf(failedShard);
            logger.trace("{}, resolved source to [{}]. canceling relocation ... ({})", failedShard.shardId(), sourceShard,
        routingChangesObserver.shardFailed(failedShard, unassignedInfo);
    } else {
        assert failedShard.active();
        if (failedShard.primary()) {
            // promote active replica to primary if active replica exists
            ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
            if (activeReplica == null) {
                moveToUnassigned(failedShard, unassignedInfo);
            } else {
                movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo);
                promoteReplicaToPrimary(activeReplica, indexMetaData, routingChangesObserver);
        } else {
            assert failedShard.primary() == false;
            if (failedShard.relocating()) {
            } else {
                moveToUnassigned(failedShard, unassignedInfo);
        routingChangesObserver.shardFailed(failedShard, unassignedInfo);
    assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == null : "failedShard " + failedShard +
        " was matched but wasn't removed";

介绍了上述内容,现在回到deassociateDeadNodes(ClusterState clusterState, boolean reroute, String reason),该方法在经过了上述过程之后,会进行一次reroute操作,也就是重新进行一次分配,reroute方法会返回分配之后最新的ClusterState,具体过程会在后面有专文介绍。


return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));


4 总结



