ElasticSearch Master、Node故障检测

2018-12-16  本文已影响0人  persisting_

1 概述


1.1 NodesFaultDetectionMasterFaultDetection初始化


//ZenDiscovery 构造函数
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this::clusterState, masterService, clusterName);
this.masterFD.addListener(new MasterNodeFailureListener());
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, this::clusterState, clusterName);
this.nodesFD.addListener(new NodeFaultDetectionListener());


protected void doStart() {
    DiscoveryNode localNode = transportService.getLocalNode();
    assert localNode != null;
    synchronized (stateMutex) {

1.2 MasterFaultDetectionNodesFaultDetection初始化的启动

MasterFaultDetectionNodesFaultDetection类对象在ZenDiscovery中被初始化。在ElasticSearch Master选举、自动Cluster发现我们介绍了ElasticSearch中的主分片选举和Cluster自动发现。


public synchronized void closeAndBecomeMaster() {

    Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
    final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)";

    tasks.put(BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result
    tasks.put(FINISH_ELECTION_TASK, electionFinishedListener);
    masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);




// return true if state has been sent to applier
boolean processNextCommittedClusterState(String reason) {
    // update failure detection only after the state has been updated to prevent race condition with handleLeaveRequest
    // and handleNodeFailure as those check the current state to determine whether the failure is to be handled by this node
    if (newClusterState.nodes().isLocalNodeElectedMaster()) {
        // update the set of nodes to ping
    } else {
        // check to see that we monitor the correct master of the cluster
        if (masterFD.masterNode() == null || !masterFD.masterNode().equals(newClusterState.nodes().getMasterNode())) {
                "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");


2 NodesFaultDetection逻辑


    * make sure that nodes in clusterState are pinged. Any pinging to nodes which are not
    * part of the cluster will be stopped
public void updateNodesAndPing(ClusterState clusterState) {
    // remove any nodes we don't need, this will cause their FD to stop
    for (DiscoveryNode monitoredNode : nodesFD.keySet()) {
        if (!clusterState.nodes().nodeExists(monitoredNode)) {
    // add any missing nodes
    for (DiscoveryNode node : clusterState.nodes()) {
        if (node.equals(localNode)) {
            // no need to monitor the local node
        if (!nodesFD.containsKey(node)) {
            NodeFD fd = new NodeFD(node);
            // it's OK to overwrite an existing nodeFD - it will just stop and the new one will pick things up.
            nodesFD.put(node, fd);
            // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
            threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, fd);



public void run() {
    //return NodeFD.this.equals(nodesFD.get(node));
    if (!running()) {
    final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING)
    transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, new TransportResponseHandler<PingResponse>() {
                public PingResponse newInstance() {
                    return new PingResponse();
                public void handleResponse(PingResponse response) {
                    if (!running()) {
                    retryCount = 0;
                    threadPool.schedule(pingInterval, ThreadPool.Names.SAME, NodeFD.this);
                public void handleException(TransportException exp) {
                    if (!running()) {

                    if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                    if (retryCount >= pingRetryCount) {
                        // not good, failure
                        if (nodesFD.remove(node, NodeFD.this)) {
                            notifyNodeFailure(node, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum ["
                                + pingRetryTimeout + "] timeout");
                    } else {
                        // resend the request, not reschedule, rely on send timeout
                        transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, this);

                public String executor() {
                    return ThreadPool.Names.SAME;

2.1 NodesFaultDetection连接异常处理


protected void handleTransportDisconnect(DiscoveryNode node) {
    NodeFD nodeFD = nodesFD.remove(node);
    if (nodeFD == null) {
    if (connectOnNetworkDisconnect) {
        NodeFD fd = new NodeFD(node);
        try {
            nodesFD.put(node, fd);
            // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
            threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, fd);
        } catch (Exception e) {
            logger.trace("[node  ] [{}] transport disconnected (with verified connect)", node);
            // clean up if needed, just to be safe..
            nodesFD.remove(node, fd);
            notifyNodeFailure(node, "transport disconnected (with verified connect)");
    } else {
        logger.trace("[node  ] [{}] transport disconnected", node);
        notifyNodeFailure(node, "transport disconnected");

2.2 目标(非主)节点对Master Ping请求的响应


public NodesFaultDetection(...) {
        PING_ACTION_NAME, PingRequest::new, ThreadPool.Names.SAME, false, false, new PingRequestHandler());
class PingRequestHandler implements TransportRequestHandler<PingRequest> {
    public void messageReceived(PingRequest request, TransportChannel channel, Task task) throws Exception {
        // if we are not the node we are supposed to be pinged, send an exception
        // this can happen when a kill -9 is sent, and another node is started using the same port
        if (!localNode.equals(request.targetNode())) {
            throw new IllegalStateException("Got pinged as node " + request.targetNode() + "], but I am node " + localNode );

        // PingRequest will have clusterName set to null if it came from a node of version <1.4.0
        if (request.clusterName != null && !request.clusterName.equals(clusterName)) {
            // Don't introduce new exception for bwc reasons
            throw new IllegalStateException("Got pinged with cluster name [" + request.clusterName + "], but I'm part of cluster ["
                + clusterName + "]");
        channel.sendResponse(new PingResponse());

private void notifyPingReceived(final PingRequest pingRequest) {
    threadPool.generic().execute(new Runnable() {
        public void run() {
            for (Listener listener : listeners) {



public void onPingReceived(final NodesFaultDetection.PingRequest pingRequest) {
    // if we are master, we don't expect any fault detection from another node. If we get it
    // means we potentially have two masters in the cluster.
    if (!localNodeMaster()) {
    if (pingsWhileMaster.incrementAndGet() < maxPingsFromAnotherMaster) {
        logger.trace("got a ping from another master {}. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get());
    logger.debug("got a ping from another master {}. resolving who should rejoin. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get());
    synchronized (stateMutex) {
        ClusterState currentState = committedState.get();
        if (currentState.nodes().isLocalNodeElectedMaster()) {
            handleAnotherMaster(currentState, pingRequest.masterNode(), pingRequest.clusterStateVersion(), "node fd ping");

private void handleAnotherMaster(...) {
        assert Thread.holdsLock(stateMutex);
        if (otherClusterStateVersion > localClusterState.version()) {
            rejoin("zen-disco-discovered another master with a new cluster_state [" + otherMaster + "][" + reason + "]");
        } else {
            // TODO: do this outside mutex
            logger.warn("discovered [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster ([{}])", otherMaster, otherMaster, reason);
            try {
                // make sure we're connected to this node (connect to node does nothing if we're already connected)
                // since the network connections are asymmetric, it may be that we received a state but have disconnected from the node
                // in the past (after a master failure, for example)
                transportService.sendRequest(otherMaster, DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequest(localClusterState.nodes().getLocalNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {

                    public void handleException(TransportException exp) {
                        logger.warn(() -> new ParameterizedMessage("failed to send rejoin request to [{}]", otherMaster), exp);
            } catch (Exception e) {
                logger.warn(() -> new ParameterizedMessage("failed to send rejoin request to [{}]", otherMaster), e);

2.3 NodesFaultDetection处理节点故障



private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
    try {
        threadPool.generic().execute(new Runnable() {
            public void run() {
                for (Listener listener : listeners) {
                    listener.onNodeFailure(node, reason);
    } catch (EsRejectedExecutionException ex) {
        logger.trace(() -> new ParameterizedMessage(
                "[node  ] [{}] ignoring node failure (reason [{}]). Local node is shutting down", node, reason), ex);

public void onNodeFailure(DiscoveryNode node, String reason) {
    handleNodeFailure(node, reason);

private void handleNodeFailure(final DiscoveryNode node, final String reason) {
    if (lifecycleState() != Lifecycle.State.STARTED) {
        // not started, ignore a node failure
    if (!localNodeMaster()) {
        // nothing to do here...
    removeNode(node, "zen-disco-node-failed", reason);
 private void removeNode(final DiscoveryNode node, final String source, final String reason) {
            source + "(" + node + "), reason(" + reason + ")",
            new NodeRemovalClusterStateTaskExecutor.Task(node, reason),

3 MasterFaultDetection逻辑


 public void restart(DiscoveryNode masterNode, String reason) {
    synchronized (masterNodeMutex) {
        if (logger.isDebugEnabled()) {
            logger.debug("[master] restarting fault detection against master [{}], reason [{}]", masterNode, reason);

 private void innerStop() {
    // also will stop the next ping schedule
    this.retryCount = 0;
    if (masterPinger != null) {
        masterPinger = null;
    this.masterNode = null;

private void innerStart(final DiscoveryNode masterNode) {
    this.masterNode = masterNode;
    this.retryCount = 0;
    if (masterPinger != null) {
    this.masterPinger = new MasterPinger();

    // we start pinging slightly later to allow the chosen master to complete it's own master election
    threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);


public void run() {
    if (!running) {
        // return and don't spawn...
    final DiscoveryNode masterToPing = masterNode;
    if (masterToPing == null) {
        // master is null, should not happen, but we are still running, so reschedule
        threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);

    final MasterPingRequest request = new MasterPingRequest(
        clusterStateSupplier.get().nodes().getLocalNode(), masterToPing, clusterName);
    final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING)
    transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options,
        new TransportResponseHandler<MasterPingResponseResponse>() {
                public MasterPingResponseResponse newInstance() {
                    return new MasterPingResponseResponse();
                public void handleResponse(MasterPingResponseResponse response) {
                    if (!running) {
                    // reset the counter, we got a good result
                    MasterFaultDetection.this.retryCount = 0;
                    // check if the master node did not get switched on us..., if it did, we simply return with no reschedule
                    if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                        // we don't stop on disconnection from master, we keep pinging it
                        threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
                public void handleException(TransportException exp) {
                    if (!running) {
                    synchronized (masterNodeMutex) {
                        // check if the master node did not get switched on us...
                        if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                            if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                            } else if (exp.getCause() instanceof NotMasterException) {
                                logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
                                notifyMasterFailure(masterToPing, exp, "no longer master");
                            } else if (exp.getCause() instanceof ThisIsNotTheMasterYouAreLookingForException) {
                                logger.debug("[master] pinging a master {} that is not the master", masterNode);
                                notifyMasterFailure(masterToPing, exp,"not master");
                            } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
                                logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure"
                                    , masterNode);
                                notifyMasterFailure(masterToPing, exp,"do not exists on master, act as master failure");
                            int retryCount = ++MasterFaultDetection.this.retryCount;
                            logger.trace(() -> new ParameterizedMessage(
                                    "[master] failed to ping [{}], retry [{}] out of [{}]",
                                    masterNode, retryCount, pingRetryCount), exp);
                            if (retryCount >= pingRetryCount) {
                                logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout",
                                    masterNode, pingRetryCount, pingRetryTimeout);
                                // not good, failure
                                notifyMasterFailure(masterToPing, null, "failed to ping, tried [" + pingRetryCount
                                    + "] times, each with  maximum [" + pingRetryTimeout + "] timeout");
                            } else {
                                // resend the request, not reschedule, rely on send timeout
                                transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);

                public String executor() {
                    return ThreadPool.Names.SAME;

3.1 MasterFaultDetection连接异常处理


protected void handleTransportDisconnect(DiscoveryNode node) {
    synchronized (masterNodeMutex) {
        if (!node.equals(this.masterNode)) {
        if (connectOnNetworkDisconnect) {
            try {
                // if all is well, make sure we restart the pinger
                if (masterPinger != null) {
                this.masterPinger = new MasterPinger();
                // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
                threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger);
            } catch (Exception e) {
                logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
                notifyMasterFailure(masterNode, null, "transport disconnected (with verified connect)");
        } else {
            logger.trace("[master] [{}] transport disconnected", node);
            notifyMasterFailure(node, null, "transport disconnected");

3.2 主节点对非主节点ping请求的响应


public MasterFaultDetection(...) {

        MASTER_PING_ACTION_NAME, MasterPingRequest::new, ThreadPool.Names.SAME, false, false, new MasterPingRequestHandler());
 private class MasterPingRequestHandler implements TransportRequestHandler<MasterPingRequest> {

    public void messageReceived(final MasterPingRequest request, final TransportChannel channel, Task task) throws Exception {
        final DiscoveryNodes nodes = clusterStateSupplier.get().nodes();
        // check if we are really the same master as the one we seemed to be think we are
        // this can happen if the master got "kill -9" and then another node started using the same port
        if (!request.masterNode.equals(nodes.getLocalNode())) {
            throw new ThisIsNotTheMasterYouAreLookingForException();

        // ping from nodes of version < 1.4.0 will have the clustername set to null
        if (request.clusterName != null && !request.clusterName.equals(clusterName)) {
            logger.trace("master fault detection ping request is targeted for a different [{}] cluster then us [{}]",
                request.clusterName, clusterName);
            throw new ThisIsNotTheMasterYouAreLookingForException("master fault detection ping request is targeted for a different ["
                + request.clusterName + "] cluster then us [" + clusterName + "]");

        // when we are elected as master or when a node joins, we use a cluster state update thread
        // to incorporate that information in the cluster state. That cluster state is published
        // before we make it available locally. This means that a master ping can come from a node
        // that has already processed the new CS but it is not known locally.
        // Therefore, if we fail we have to check again under a cluster state thread to make sure
        // all processing is finished.
        if (!nodes.isLocalNodeElectedMaster() || !nodes.nodeExists(request.sourceNode)) {
            logger.trace("checking ping from {} under a cluster state thread", request.sourceNode);
            masterService.submitStateUpdateTask("master ping (from: " + request.sourceNode + ")", new ClusterStateUpdateTask() {

                public ClusterState execute(ClusterState currentState) throws Exception {
                    // if we are no longer master, fail...
                    DiscoveryNodes nodes = currentState.nodes();
                    if (!nodes.nodeExists(request.sourceNode)) {
                        throw new NodeDoesNotExistOnMasterException();
                    return currentState;

                public void onNoLongerMaster(String source) {
                    onFailure(source, new NotMasterException("local node is not master"));

                public void onFailure(String source, @Nullable Exception e) {
                    if (e == null) {
                        e = new ElasticsearchException("unknown error while processing ping");
                    try {
                    } catch (IOException inner) {
                        logger.warn("error while sending ping response", inner);

                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                    try {
                        channel.sendResponse(new MasterPingResponseResponse());
                    } catch (IOException e) {
                        logger.warn("error while sending ping response", e);
        } else {
            // send a response, and note if we are connected to the master or not
            channel.sendResponse(new MasterPingResponseResponse());

3.3 MasterFaultDetection处理主节点故障


private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
    if (notifiedMasterFailure.compareAndSet(false, true)) {
        try {
            threadPool.generic().execute(() -> {
                for (Listener listener : listeners) {
                    listener.onMasterFailure(masterNode, cause, reason);
        } catch (EsRejectedExecutionException e) {
            logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e);
        stop("master failure, " + reason);
public void stop(String reason) {
    synchronized (masterNodeMutex) {
        if (masterNode != null) {
            if (logger.isDebugEnabled()) {
                logger.debug("[master] stopping fault detection against master [{}], reason [{}]", masterNode, reason);

private void innerStop() {
    // also will stop the next ping schedule
    this.retryCount = 0;
    if (masterPinger != null) {
        masterPinger = null;
    this.masterNode = null;


private class MasterNodeFailureListener implements MasterFaultDetection.Listener {

    public void onMasterFailure(DiscoveryNode masterNode, Throwable cause, String reason) {
        handleMasterGone(masterNode, cause, reason);

private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
    if (lifecycleState() != Lifecycle.State.STARTED) {
        // not started, ignore a master failure
    if (localNodeMaster()) {
        // we might get this on both a master telling us shutting down, and then the disconnect failure

    logger.info(() -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);

    synchronized (stateMutex) {
        if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) {
            // flush any pending cluster states from old master, so it will not be set as master again
            pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));
            rejoin("master left (reason = " + reason + ")");

4 注意








private class FDConnectionListener implements TransportConnectionListener {
    public void onNodeDisconnected(DiscoveryNode node) {
        AbstractRunnable runnable = new AbstractRunnable() {
            public void onFailure(Exception e) {
                logger.warn("failed to handle transport disconnect for node: {}", node);

            protected void doRun() {

上面2.3 NodesFaultDetection处理节点故障中说到Master节点在处理节点故障时会移除发生故障的节点,移除节点时需要处理该节点上面的Shard,如果该节点上有主分片,则还需要进行副分片提升,这些内容在ElasticSearch 副分片提升介绍。


