ElasticSearch Master、Node故障检测

2018-12-16  本文已影响0人  persisting_

1 概述

在ElasticSearch中,负责检测主节点和其他非主节点状态的类是MasterFaultDetectionNodesFaultDetection,其中MasterFaultDetection是其他非主节点用于定时ping主节点的类,NodesFaultDetection是主节点用来定时ping其他非主节点的类,这里不要混淆。

1.1 NodesFaultDetectionMasterFaultDetection初始化

ZenDiscovery中初始化如下:

//ZenDiscovery 构造函数
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this::clusterState, masterService, clusterName);
//设置检测到主节点故障的Listener
this.masterFD.addListener(new MasterNodeFailureListener());
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, this::clusterState, clusterName);
//设置检测到其他非主节点故障的Listener
this.nodesFD.addListener(new NodeFaultDetectionListener());

因为主节点在使用MasterFaultDetection定时向其他节点发送ping报文时,不需要将该报文发送给自己,所以在ZenDiscovery.doStart中会设置MasterFaultDetection的自身节点。

 @Override
protected void doStart() {
    DiscoveryNode localNode = transportService.getLocalNode();
    assert localNode != null;
    synchronized (stateMutex) {
        ...
        committedState.set(initialState);
        clusterApplier.setInitialState(initialState);
        //这里设置MasterFaultDetection的自身节点
        nodesFD.setLocalNode(localNode);
        joinThreadControl.start();
    }
    zenPing.start();
}

1.2 MasterFaultDetectionNodesFaultDetection初始化的启动

MasterFaultDetectionNodesFaultDetection类对象在ZenDiscovery中被初始化。在ElasticSearch Master选举、自动Cluster发现我们介绍了ElasticSearch中的主分片选举和Cluster自动发现。

其实在某一节点成功当选为主节点后,会将状态发布到整个集群中,见如下代码:

//NodeJoinController.ElectionContext
public synchronized void closeAndBecomeMaster() {
    ...
    innerClose();

    Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
    final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)";

    tasks.put(BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result
    tasks.put(FINISH_ELECTION_TASK, electionFinishedListener);
    //这里向MasterService提交一个节点状态更新任务
    masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
}

MasterService负责处理节点状态更新任务,所有引起集群状态更新的操作,最后都会向MasterService提交UpdateTask任务,MasterService则通过任务队列接受该任务并执行,最终计算出节点状态的改变,并通过ZenDiscovery发布到集群中的其他节点中。(关于MasterService对节点状态更新任务的处理后续会有文章介绍)

集群中的集群状态变更使用的是两阶段提交,先将集群状态发布到各个节点上,各个节点接收到集群状态后会先保存该状态,然后反馈发送状态信息的节点,发送状态信息的节点再根据收到的反馈情况发送提交请求。

在上面接受的两阶段提交协议中的提交阶段,会进行MasterFD和MasterFaultDetection的处理:

//ZenDiscovery
// return true if state has been sent to applier
boolean processNextCommittedClusterState(String reason) {
    ...
    // update failure detection only after the state has been updated to prevent race condition with handleLeaveRequest
    // and handleNodeFailure as those check the current state to determine whether the failure is to be handled by this node
    //如果当前节点为主节点,则更新MasterFaultDetection中记录的其他非主节点,并发送Ping请求
    if (newClusterState.nodes().isLocalNodeElectedMaster()) {
        // update the set of nodes to ping
        nodesFD.updateNodesAndPing(newClusterState);
    } else {
        // check to see that we monitor the correct master of the cluster
        //如果当前节点不是主节点,则进行MasterFD的相关处理
        //如果原先记录的主节点为空(如集群刚启动)或者集群状态中声明的
        //主节点不当与本地记录的主节点,则更新MasterFD的主节点并重新启动
        if (masterFD.masterNode() == null || !masterFD.masterNode().equals(newClusterState.nodes().getMasterNode())) {
            masterFD.restart(newClusterState.nodes().getMasterNode(),
                "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
        }
    }
    ...
}

下面我们分开看MasterFaultDetectionMasterFaultDetection的相关逻辑。

2 NodesFaultDetection逻辑

在一开始已经说了NodesFaultDetection是主节点用于定时ping集群中其他非主节点的工具。在两阶段协议的提交中如果当天节点是主节点,则会调用NodesFaultDetection.updateNodesAndPing进行集群中已知节点更新并开启ping流程。

//NodesFaultDetection
/**
    * make sure that nodes in clusterState are pinged. Any pinging to nodes which are not
    * part of the cluster will be stopped
    */
public void updateNodesAndPing(ClusterState clusterState) {
    // remove any nodes we don't need, this will cause their FD to stop
    //首先移除新集群状态中不存在的节点,移除之后会停止向该节点发送ping请求
    //停止逻辑后续会介绍
    for (DiscoveryNode monitoredNode : nodesFD.keySet()) {
        if (!clusterState.nodes().nodeExists(monitoredNode)) {
            nodesFD.remove(monitoredNode);
        }
    }
    // add any missing nodes
    //添加新加入集群或以前未知的节点
    for (DiscoveryNode node : clusterState.nodes()) {
        //排除自己,避免向自己发送ping请求
        if (node.equals(localNode)) {
            // no need to monitor the local node
            continue;
        }
        if (!nodesFD.containsKey(node)) {
            NodeFD fd = new NodeFD(node);
            // it's OK to overwrite an existing nodeFD - it will just stop and the new one will pick things up.
            nodesFD.put(node, fd);
            // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
            //对于每一个新加入被监控的节点,开启一个定时任务,定时发送
            //ping请求至该节点。
            threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, fd);
        }
    }
}

从上面代码中可以知道NodesFaultDetection使用NodeFD记录需要ping的其他非主节点,维护了一个node->NodeFD的Map。NodeFD实现了Runnable接口,所以可以进行定时执行。

下面具体看下ping逻辑,上面说过NodeFD实现了Runnable接口,所以ping逻辑也就在其run方法中:

@Override
public void run() {
    //上面说到了移除此次ClusterState状态中不存在的节点会停止ping该节点
    //其具体实现逻辑就在这里,running的定义比较简单,即
    //return NodeFD.this.equals(nodesFD.get(node));
    //其函数里就是判断NodeFD关联的节点是否还在列表中,如果不在直接返回,停止ping
    if (!running()) {
        return;
    }
    //pingRetryTimeout即重试时间间隔由discovery.zen.fd.ping_timeout配置,默认30秒
    final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING)
        .withTimeout(pingRetryTimeout).build();
    //向该节点发送PING_ACTION_NAME请求,等下我们可以看下其他非主节点注册
    //处理PING_ACTION_NAME请求的handler
    transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, new TransportResponseHandler<PingResponse>() {
                @Override
                public PingResponse newInstance() {
                    return new PingResponse();
                }
                //如果成功收到目标节点响应,则等待pingInterVal之后发送下一个ping
                @Override
                public void handleResponse(PingResponse response) {
                    if (!running()) {
                        return;
                    }
                    //retryCount用于记录失败次数,如果连续失败小于指
                    //定次数则重试,否则报告节点故障。
                    retryCount = 0;
                    threadPool.schedule(pingInterval, ThreadPool.Names.SAME, NodeFD.this);
                }
                //对异常的处理
                @Override
                public void handleException(TransportException exp) {
                    if (!running()) {
                        return;
                    }

                    if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                        //连接异常
                        handleTransportDisconnect(node);
                        return;
                    }
                    //记录失败次数
                    retryCount++;
                    ...
                    //pingRetryCount即最大连续失败次数,由discovery.zen.fd.ping_retries配置,默认3次
                    if (retryCount >= pingRetryCount) {
                        ...
                        // not good, failure
                        if (nodesFD.remove(node, NodeFD.this)) {
                            //失败次数大于配置的连续失败最大次数,则报告节点故障
                            notifyNodeFailure(node, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum ["
                                + pingRetryTimeout + "] timeout");
                        }
                    } else {
                        // resend the request, not reschedule, rely on send timeout
                        //失败次数小于配置的最大连续失败次数,则立刻重新发送ping请求,
                        //后续如果得到响应成功则继续下一轮的ping,如果响应失败则累计失败次数,
                        //达到阈值时报告节点故障。
                        transportService.sendRequest(node, PING_ACTION_NAME, newPingRequest(), options, this);
                    }
                }

                @Override
                public String executor() {
                    return ThreadPool.Names.SAME;
                }
            }
    );
    }
}

2.1 NodesFaultDetection连接异常处理

如果发送ping请求发生ConnectTransportException异常会调用NodesFaultDetection.handleTransportDisconnect进行处理

//NodesFaultDetection.handleTransportDisconnect
@Override
protected void handleTransportDisconnect(DiscoveryNode node) {
    //首先从ping列表中移除该节点
    NodeFD nodeFD = nodesFD.remove(node);
    if (nodeFD == null) {
        return;
    }
    //connectOnNetworkDisconnect是一个配置项discovery.zen.fd.connect_on_network_disconnect
    //用来判断发生故障后是否需要自动重连
    if (connectOnNetworkDisconnect) {
        NodeFD fd = new NodeFD(node);
        try {
            //重新获取连接到该节点的Channel
            transportService.connectToNode(node);
            nodesFD.put(node, fd);
            // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
            //开启新的定义任务
            threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, fd);
        } catch (Exception e) {
            logger.trace("[node  ] [{}] transport disconnected (with verified connect)", node);
            // clean up if needed, just to be safe..
            //发生失败则报告节点故障
            nodesFD.remove(node, fd);
            notifyNodeFailure(node, "transport disconnected (with verified connect)");
        }
    } else {
        //如果discovery.zen.fd.connect_on_network_disconnect配置为不自动重连则直接报告节点故障
        logger.trace("[node  ] [{}] transport disconnected", node);
        notifyNodeFailure(node, "transport disconnected");
    }
}

2.2 目标(非主)节点对Master Ping请求的响应

Master节点会定时向非主节点发送PING_ACTION_NAME请求,我们可以看下该请求注册的handler其实是PingRequestHandler

//NodesFaultDetection
public NodesFaultDetection(...) {
    ...
    transportService.registerRequestHandler(
        PING_ACTION_NAME, PingRequest::new, ThreadPool.Names.SAME, false, false, new PingRequestHandler());
}
//NodesFaultDetection.PingRequestHandler
class PingRequestHandler implements TransportRequestHandler<PingRequest> {
    @Override
    public void messageReceived(PingRequest request, TransportChannel channel, Task task) throws Exception {
        // if we are not the node we are supposed to be pinged, send an exception
        // this can happen when a kill -9 is sent, and another node is started using the same port
        if (!localNode.equals(request.targetNode())) {
            throw new IllegalStateException("Got pinged as node " + request.targetNode() + "], but I am node " + localNode );
        }

        // PingRequest will have clusterName set to null if it came from a node of version <1.4.0
        if (request.clusterName != null && !request.clusterName.equals(clusterName)) {
            // Don't introduce new exception for bwc reasons
            throw new IllegalStateException("Got pinged with cluster name [" + request.clusterName + "], but I'm part of cluster ["
                + clusterName + "]");
        }
        //表示当前节点收到了主节点的ping请求
        notifyPingReceived(request);
        //发送响应
        channel.sendResponse(new PingResponse());
    }
}

//NodesFaultDetection
private void notifyPingReceived(final PingRequest pingRequest) {
    threadPool.generic().execute(new Runnable() {
        @Override
        public void run() {
            //这里的Listener是在ZenDiscovery设置的NodeFaultDetectionListener
            for (Listener listener : listeners) {
                listener.onPingReceived(pingRequest);
            }
        }

    });
}

ZenDiscovery设置的NodeFaultDetectionListener对ping请求的处理:

//ZenDiscovery.NodeFaultDetectionListener
@Override
public void onPingReceived(final NodesFaultDetection.PingRequest pingRequest) {
    // if we are master, we don't expect any fault detection from another node. If we get it
    // means we potentially have two masters in the cluster.
    //如果自己不是主节点,那么收到其他节点的ping请求很合理,直接返回即可
    //pingsWhileMaster下面介绍
    if (!localNodeMaster()) {
        pingsWhileMaster.set(0);
        return;
    }
    //到这里表示自己是主节点,但是收到了其他节点发过来的ping非主节点的请求。
    //这就表明集群中还存在其他的主节点(除自己之外还有另一个主节点).
    //pingsWhileMaster用来记录当自己为主节点时,收到其他“主节点”发送的ping非主节点请求的次数.
    //如果收到的“非法”ping请求小于指定次数,则暂时不处理
    //maxPingsFromAnotherMaster是通过配置discovery.zen.max_pings_from_another_master配置的
    if (pingsWhileMaster.incrementAndGet() < maxPingsFromAnotherMaster) {
        logger.trace("got a ping from another master {}. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get());
        return;
    }
    logger.debug("got a ping from another master {}. resolving who should rejoin. current ping count: [{}]", pingRequest.masterNode(), pingsWhileMaster.get());
    //如果超过指定配置的最大其他主节点ping次数,则断定还存在另一个主节点
    //使用handleAnotherMaster处理
    synchronized (stateMutex) {
        ClusterState currentState = committedState.get();
        if (currentState.nodes().isLocalNodeElectedMaster()) {
            pingsWhileMaster.set(0);
            handleAnotherMaster(currentState, pingRequest.masterNode(), pingRequest.clusterStateVersion(), "node fd ping");
        }
    }
}

//ZenDiscovery
private void handleAnotherMaster(...) {
        ...
        assert Thread.holdsLock(stateMutex);
        //如果其他节点版本号大于本地版本号,则表示自己的状态是过时的,只要本地重新加入其他节点即可。
        //rejoin会启动本地的选举逻辑,因为本地状态过时,所以此次选举其他节点可能已经有一个被大多数节点承认的主节点,
        //选举会可能会直接获取到其他节点维护的主节点。
        if (otherClusterStateVersion > localClusterState.version()) {
            rejoin("zen-disco-discovered another master with a new cluster_state [" + otherMaster + "][" + reason + "]");
        } else {
            //如果本地版本号最大,即本地状态最新,则向另一个节点发送DISCOVERY_REJOIN_ACTION_NAME请求
            //该节点收到DISCOVERY_REJOIN_ACTION_NAME请求后会调用rejoin
            // TODO: do this outside mutex
            logger.warn("discovered [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster ([{}])", otherMaster, otherMaster, reason);
            try {
                // make sure we're connected to this node (connect to node does nothing if we're already connected)
                // since the network connections are asymmetric, it may be that we received a state but have disconnected from the node
                // in the past (after a master failure, for example)
                transportService.connectToNode(otherMaster);
                transportService.sendRequest(otherMaster, DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequest(localClusterState.nodes().getLocalNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {

                    @Override
                    public void handleException(TransportException exp) {
                        logger.warn(() -> new ParameterizedMessage("failed to send rejoin request to [{}]", otherMaster), exp);
                    }
                });
            } catch (Exception e) {
                logger.warn(() -> new ParameterizedMessage("failed to send rejoin request to [{}]", otherMaster), e);
            }
        }
    }

2.3 NodesFaultDetection处理节点故障

通过上面的描述可以知道,当发送通信异常且配置为不自动重连或配置为自动重连但连接失败会判断节点故障,还有当连续重试指定次数ping失败,也会判断目标节点故障。

节点故障使用NodesFaultDetection.notifyNodeFailure处理:

//NodesFaultDetection
private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
    try {
        threadPool.generic().execute(new Runnable() {
            @Override
            public void run() {
                //这里的Listener同样是在ZenDiscovery设置的NodeFaultDetectionListener
                for (Listener listener : listeners) {
                    listener.onNodeFailure(node, reason);
                }
            }
        });
    } catch (EsRejectedExecutionException ex) {
        logger.trace(() -> new ParameterizedMessage(
                "[node  ] [{}] ignoring node failure (reason [{}]). Local node is shutting down", node, reason), ex);
    }
}

//NodesFaultDetection.NodeFaultDetectionListener
 @Override
public void onNodeFailure(DiscoveryNode node, String reason) {
    handleNodeFailure(node, reason);
}

//ZenDiscovery
private void handleNodeFailure(final DiscoveryNode node, final String reason) {
    //自身(主节点)没有启动,则忽略
    if (lifecycleState() != Lifecycle.State.STARTED) {
        // not started, ignore a node failure
        return;
    }
    //本节点不是主节点,则忽略,这个有可能是自己还是主节点时发送的ping请求,但是此时自己已经不是主节点了
    if (!localNodeMaster()) {
        // nothing to do here...
        return;
    }
    //移除该节点
    removeNode(node, "zen-disco-node-failed", reason);
}
//ZenDiscovery
 private void removeNode(final DiscoveryNode node, final String source, final String reason) {
    //通知集群中其他节点移除该节点,这里涉及到副分片提升,将在另一篇文章介绍
    masterService.submitStateUpdateTask(
            source + "(" + node + "), reason(" + reason + ")",
            new NodeRemovalClusterStateTaskExecutor.Task(node, reason),
            ClusterStateTaskConfig.build(Priority.IMMEDIATE),
            nodeRemovalExecutor,
            nodeRemovalExecutor);
}

3 MasterFaultDetection逻辑

MasterFaultDetection是其他非节点用来定时ping主节点的工具类。在一开始描述的两阶段提交协议中,如果自身为非主节点,则在MasterFaultDetection为空时(集群刚启动)或者MasterFaultDetection中记录的主节点不是状态中的主节点时(主节点变动)会调用MasterFaultDetection.restart

//MasterFaultDetection
 public void restart(DiscoveryNode masterNode, String reason) {
    synchronized (masterNodeMutex) {
        if (logger.isDebugEnabled()) {
            logger.debug("[master] restarting fault detection against master [{}], reason [{}]", masterNode, reason);
        }
        //首先停止、重置目前的状态
        innerStop();
        //开始新一轮的主节点Ping
        innerStart(masterNode);
    }
}

 private void innerStop() {
    // also will stop the next ping schedule
    //重试次数置零
    this.retryCount = 0;
    //masterPinger是用来ping主节点的定时任务
    //先停止masterPinger
    if (masterPinger != null) {
        masterPinger.stop();
        masterPinger = null;
    }
    this.masterNode = null;
}

private void innerStart(final DiscoveryNode masterNode) {
    //记录当前需要ping的主节点
    this.masterNode = masterNode;
    //重试次数置零
    this.retryCount = 0;
    this.notifiedMasterFailure.set(false);
    //如果masterPinger不为空,则先停止
    if (masterPinger != null) {
        masterPinger.stop();
    }
    //新建一个新的masterPinger
    this.masterPinger = new MasterPinger();

    // we start pinging slightly later to allow the chosen master to complete it's own master election
    //开始ping主节点的定时任务,pingInterval是ping时间间隔,由
    //discovery.zen.fd.ping_interval配置,默认1秒
    threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger);
}

NodeFD一样MasterPinger也实现了Runnable接口,作为定时任务被调度,所以MasterFaultDetection逻辑主要在MasterPinger.run中:

//MasterFaultDetection.MasterPinger
 @Override
public void run() {
    //stop后会置running为false
    if (!running) {
        // return and don't spawn...
        return;
    }
    final DiscoveryNode masterToPing = masterNode;
    //如果当前记录主节点为空,则直接设置下一个ping主节点的定时任务
    if (masterToPing == null) {
        // master is null, should not happen, but we are still running, so reschedule
        threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
        return;
    }

    final MasterPingRequest request = new MasterPingRequest(
        clusterStateSupplier.get().nodes().getLocalNode(), masterToPing, clusterName);
    //pingRetryTimeout即失败后的重试间隔时间,由discovery.zen.fd.ping_timeout配置,默认30秒
    final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING)
        .withTimeout(pingRetryTimeout).build();
    //向主节点发送MASTER_PING_ACTION_NAME请求
    transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options,
        new TransportResponseHandler<MasterPingResponseResponse>() {
                @Override
                public MasterPingResponseResponse newInstance() {
                    return new MasterPingResponseResponse();
                }
                //成功接收主节点响应
                @Override
                public void handleResponse(MasterPingResponseResponse response) {
                    if (!running) {
                        return;
                    }
                    // reset the counter, we got a good result
                    //失败重试次数置零
                    MasterFaultDetection.this.retryCount = 0;
                    // check if the master node did not get switched on us..., if it did, we simply return with no reschedule
                    //如果主节点没有变化,则启动下一个定时任务,
                    //pingInterval即ping时间间隔,由discovery.zen.fd.ping_interval
                    //配置默认1秒
                    if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                        // we don't stop on disconnection from master, we keep pinging it
                        threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
                    }
                }
                //异常处理
                @Override
                public void handleException(TransportException exp) {
                    if (!running) {
                        return;
                    }
                    synchronized (masterNodeMutex) {
                        // check if the master node did not get switched on us...
                        if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                            if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                                //通信异常处理
                                handleTransportDisconnect(masterToPing);
                                return;
                            } else if (exp.getCause() instanceof NotMasterException) {
                                logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
                                //发送的目标节点不在是主节点,即主节点已经重新选举,报告主节点故障
                                notifyMasterFailure(masterToPing, exp, "no longer master");
                                return;
                            } else if (exp.getCause() instanceof ThisIsNotTheMasterYouAreLookingForException) {
                                logger.debug("[master] pinging a master {} that is not the master", masterNode);
                                //目标节点不是主节点,报告主节点故障
                                notifyMasterFailure(masterToPing, exp,"not master");
                                return;
                            } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
                                logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure"
                                    , masterNode);
                                //目标节点不是主节点,报告主节点故障
                                notifyMasterFailure(masterToPing, exp,"do not exists on master, act as master failure");
                                return;
                            }
                            //不是上面列出的异常,则ping重试次数加一
                            int retryCount = ++MasterFaultDetection.this.retryCount;
                            logger.trace(() -> new ParameterizedMessage(
                                    "[master] failed to ping [{}], retry [{}] out of [{}]",
                                    masterNode, retryCount, pingRetryCount), exp);
                            //如果重试次数大于配置的最大重试次数,则断点主节点异常,报告主节点发生故障
                            //pingRetryCount配置的最大重试次数,
                            //由discovery.zen.fd.ping_retries配置,默认3次。
                            if (retryCount >= pingRetryCount) {
                                logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout",
                                    masterNode, pingRetryCount, pingRetryTimeout);
                                // not good, failure
                                notifyMasterFailure(masterToPing, null, "failed to ping, tried [" + pingRetryCount
                                    + "] times, each with  maximum [" + pingRetryTimeout + "] timeout");
                            } else {
                                //如果重试次数还没有达到配置的最大重试次数,则等待重试时间间隔后再次发送ping至主节点。
                                // resend the request, not reschedule, rely on send timeout
                                transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
                            }
                        }
                    }
                }

                @Override
                public String executor() {
                    return ThreadPool.Names.SAME;
                }
            }
    );
}
}

3.1 MasterFaultDetection连接异常处理

如果发生通信异常,则会使用MasterFaultDetection.handleTransportDisconnect处理

//MasterFaultDetection
@Override
protected void handleTransportDisconnect(DiscoveryNode node) {
    synchronized (masterNodeMutex) {
        if (!node.equals(this.masterNode)) {
            return;
        }
        //和主节点ping其他非主节点一样,非主节点ping主节点通信异常也由配置
        //discovery.zen.fd.connect_on_network_disconnect配置是否自动重连
        if (connectOnNetworkDisconnect) {
            try {
                //重新打开至主节点的channel
                transportService.connectToNode(node);
                // if all is well, make sure we restart the pinger
                if (masterPinger != null) {
                    masterPinger.stop();
                }
                //开启新的定时任务ping主节点
                this.masterPinger = new MasterPinger();
                // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
                threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger);
            } catch (Exception e) {
                logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
                //发生异常则报告主节点故障
                notifyMasterFailure(masterNode, null, "transport disconnected (with verified connect)");
            }
        } else {
            //如果配置为不自动重连,则直接报告主节点故障
            logger.trace("[master] [{}] transport disconnected", node);
            notifyMasterFailure(node, null, "transport disconnected");
        }
    }
}

3.2 主节点对非主节点ping请求的响应

非主节点向主节点发送MASTER_PING_ACTION_NAME请求,主节点注册的handler为MasterPingRequestHandler

//MasterFaultDetection
public MasterFaultDetection(...) {
    ...

    transportService.registerRequestHandler(
        MASTER_PING_ACTION_NAME, MasterPingRequest::new, ThreadPool.Names.SAME, false, false, new MasterPingRequestHandler());
}
//MasterFaultDetection.MasterPingRequestHandler
 private class MasterPingRequestHandler implements TransportRequestHandler<MasterPingRequest> {

    @Override
    public void messageReceived(final MasterPingRequest request, final TransportChannel channel, Task task) throws Exception {
        final DiscoveryNodes nodes = clusterStateSupplier.get().nodes();
        // check if we are really the same master as the one we seemed to be think we are
        // this can happen if the master got "kill -9" and then another node started using the same port
        //如果当前节点不是请求中注明的主接点,则发送ThisIsNotTheMasterYouAreLookingForException异常
        if (!request.masterNode.equals(nodes.getLocalNode())) {
            throw new ThisIsNotTheMasterYouAreLookingForException();
        }

        // ping from nodes of version < 1.4.0 will have the clustername set to null
        //当前节点集群名称和请求中注明的集群名称不一致
        if (request.clusterName != null && !request.clusterName.equals(clusterName)) {
            logger.trace("master fault detection ping request is targeted for a different [{}] cluster then us [{}]",
                request.clusterName, clusterName);
            throw new ThisIsNotTheMasterYouAreLookingForException("master fault detection ping request is targeted for a different ["
                + request.clusterName + "] cluster then us [" + clusterName + "]");
        }

        // when we are elected as master or when a node joins, we use a cluster state update thread
        // to incorporate that information in the cluster state. That cluster state is published
        // before we make it available locally. This means that a master ping can come from a node
        // that has already processed the new CS but it is not known locally.
        // Therefore, if we fail we have to check again under a cluster state thread to make sure
        // all processing is finished.
        //处理自己为非主节点但是却收到其他节点需要发送给主节点ping请求的异常以及发送请求的节点不存在的异常
        //向集群中其他节点广播该状态变更
        if (!nodes.isLocalNodeElectedMaster() || !nodes.nodeExists(request.sourceNode)) {
            logger.trace("checking ping from {} under a cluster state thread", request.sourceNode);
            masterService.submitStateUpdateTask("master ping (from: " + request.sourceNode + ")", new ClusterStateUpdateTask() {

                @Override
                public ClusterState execute(ClusterState currentState) throws Exception {
                    // if we are no longer master, fail...
                    DiscoveryNodes nodes = currentState.nodes();
                    if (!nodes.nodeExists(request.sourceNode)) {
                        throw new NodeDoesNotExistOnMasterException();
                    }
                    return currentState;
                }

                @Override
                public void onNoLongerMaster(String source) {
                    onFailure(source, new NotMasterException("local node is not master"));
                }

                @Override
                public void onFailure(String source, @Nullable Exception e) {
                    if (e == null) {
                        e = new ElasticsearchException("unknown error while processing ping");
                    }
                    try {
                        channel.sendResponse(e);
                    } catch (IOException inner) {
                        inner.addSuppressed(e);
                        logger.warn("error while sending ping response", inner);
                    }
                }

                @Override
                public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
                    try {
                        channel.sendResponse(new MasterPingResponseResponse());
                    } catch (IOException e) {
                        logger.warn("error while sending ping response", e);
                    }
                }
            });
        } else {
            //正常情况则发送响应给发送请求的节点
            // send a response, and note if we are connected to the master or not
            channel.sendResponse(new MasterPingResponseResponse());
        }
    }
}

3.3 MasterFaultDetection处理主节点故障

通过上面的过程可知,当发生连接异常且配置为不自动重连或者配置为自动重连但连接失败以及其他主节点故障时,会使用MasterFaultDetection.notifyMasterFailure进行处理:

//MasterFaultDetection
private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
    if (notifiedMasterFailure.compareAndSet(false, true)) {
        try {
            threadPool.generic().execute(() -> {
                //Listener为在ZenDiscovery注册的MasterNodeFailureListener
                for (Listener listener : listeners) {
                    listener.onMasterFailure(masterNode, cause, reason);
                }
            });
        } catch (EsRejectedExecutionException e) {
            logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e);
        }
        //停止ping主节点的定时任务
        stop("master failure, " + reason);
    }
}
public void stop(String reason) {
    synchronized (masterNodeMutex) {
        if (masterNode != null) {
            if (logger.isDebugEnabled()) {
                logger.debug("[master] stopping fault detection against master [{}], reason [{}]", masterNode, reason);
            }
        }
        innerStop();
    }
}

private void innerStop() {
    // also will stop the next ping schedule
    this.retryCount = 0;
    if (masterPinger != null) {
        masterPinger.stop();
        masterPinger = null;
    }
    this.masterNode = null;
}

MasterNodeFailureListener处理主节点故障逻辑如下:

//ZenDiscovery.MasterNodeFailureListener
private class MasterNodeFailureListener implements MasterFaultDetection.Listener {

    @Override
    public void onMasterFailure(DiscoveryNode masterNode, Throwable cause, String reason) {
        handleMasterGone(masterNode, cause, reason);
    }
}

//ZenDiscovery
private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
    //如果本节点(非主节点)还未启动,忽略
    if (lifecycleState() != Lifecycle.State.STARTED) {
        // not started, ignore a master failure
        return;
    }
    //本节点就是主节点,忽略
    if (localNodeMaster()) {
        // we might get this on both a master telling us shutting down, and then the disconnect failure
        return;
    }

    logger.info(() -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);

    synchronized (stateMutex) {
        //如果本节点不是主节点并且当前记录的节点等于最新状态中的主节点,则进入rejoin流程
        if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) {
            // flush any pending cluster states from old master, so it will not be set as master again
            pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));
            rejoin("master left (reason = " + reason + ")");
        }
    }
}

4 注意

MasterFaultDetectionNodesFaultDetection都继承自FaultDetection,所以下列配置主节点ping非主节点和非主节点ping主节点是相同的,因为都在基类FaultDetection中定义:

discovery.zen.fd.connect_on_network_disconnect:发生连接故障时是否自动重连,默认为false。

discovery.zen.fd.ping_interval:发送ping的时间间隔,默认1秒。

discovery.zen.fd.ping_timeout:重试时间间隔,默认30秒。

discovery.zen.fd.ping_retries:ping失败重试次数,默认3次。

discovery.zen.fd.register_connection_listener:是否要在TransportService中注册节点连接故障Listener。默认为true。

discovery.zen.fd.register_connection_listener注册处理节点连接故障的Listener为FaultDetection.FDConnectionListener,如果节点发生连接异常,会直接调用FaultDetection的子类MasterFaultDetectionNodesFaultDetection中定义的handleTransportDisconnect进行连接异常处理,上面已经介绍过了。

//FaultDetection.FDConnectionListener
private class FDConnectionListener implements TransportConnectionListener {
    @Override
    public void onNodeDisconnected(DiscoveryNode node) {
        AbstractRunnable runnable = new AbstractRunnable() {
            @Override
            public void onFailure(Exception e) {
                logger.warn("failed to handle transport disconnect for node: {}", node);
            }

            @Override
            protected void doRun() {
                //子类MasterFaultDetection和NodesFaultDetection会对handleTransportDisconnect进行重写
                handleTransportDisconnect(node);
            }
        };
        threadPool.generic().execute(runnable);
    }
}

上面2.3 NodesFaultDetection处理节点故障中说到Master节点在处理节点故障时会移除发生故障的节点,移除节点时需要处理该节点上面的Shard,如果该节点上有主分片,则还需要进行副分片提升,这些内容在ElasticSearch 副分片提升介绍。

上一篇下一篇

猜你喜欢

热点阅读