elasticsearch源码分析

Elasticsearch源码分析-索引分析(一)

2018-12-13  本文已影响0人  尹亮_36cd

1. 一个简单的索引请求示例

首先,我们来看一个索引请求:

curl -XPUT 127.0.0.1:9200/item/show/28589790
{
   "id": 28589790,
   "text": "这是一个索引文本"
}

这个请求的主要作用是向item索引中添加一个索引文档,文档信息:
文档 id: 28589790
字段id: 28589790
字段text: 这是一个索引文本
如果索引中已经包含id为28589790的索引,elasticsearch将会使用这条数据进行覆盖

2. 索引时序图

3. 索引请求转发

  1. 在elasticsearch启动时,会注入RestSearchAction 对象,并且会把方法、URI 和当前对象注册到内存中
public class RestIndexAction extends BaseRestHandler {
    @Inject
    public RestIndexAction(Settings settings, RestController controller, Client client) {
        super(settings, controller, client);
        controller.registerHandler(POST, "/{index}/{type}", this); // auto id creation
        controller.registerHandler(PUT, "/{index}/{type}/{id}", this);
        controller.registerHandler(POST, "/{index}/{type}/{id}", this);
        CreateHandler createHandler = new CreateHandler(settings, controller, client);
        controller.registerHandler(PUT, "/{index}/{type}/{id}/_create", createHandler);
        controller.registerHandler(POST, "/{index}/{type}/{id}/_create", createHandler);
    }
}

elasticsearch使用HttpRequestHandler.messageReceived()方法接受用户请求,然后调用dispatchRequest()方法对请求进行转发。
当请求跳转到RestController时,会调用getHandler()方法根据请求的Path获取对应的handler,由上文可以看出item/show/28589790 会匹配到RestIndexAction

public class RestController extends AbstractLifecycleComponent<RestController> {
    void executeHandler(RestRequest request, RestChannel channel) throws Exception {
        final RestHandler handler = getHandler(request);
        if (handler != null) {
            handler.handleRequest(request, channel);
        } else {
            if (request.method() == RestRequest.Method.OPTIONS) {
                // when we have OPTIONS request
                // simply send OK by default (with the Access Control Origin header which gets automatically added)
                channel.sendResponse(new BytesRestResponse(OK));
            } else {
                channel.sendResponse(new BytesRestResponse(BAD_REQUEST, "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]"));
            }
        }
    }
}

handler.handleRequest()方法最终会调用RestIndexAction.handleRequest()方法对索引参数进行解析,创建索引请求对象indexRequest,然后调用client.index()开始创建索引

public class RestIndexAction extends BaseRestHandler {
    @Override
    public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
        IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
        indexRequest.listenerThreaded(false);
        indexRequest.operationThreaded(true);
        indexRequest.routing(request.param("routing"));
        indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
        indexRequest.timestamp(request.param("timestamp"));
        if (request.hasParam("ttl")) {
            indexRequest.ttl(request.paramAsTime("ttl", null).millis());
        }
        indexRequest.source(request.content());
        indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
        indexRequest.refresh(request.paramAsBoolean("refresh", indexRequest.refresh()));
        indexRequest.version(RestActions.parseVersion(request));
        indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
        String sOpType = request.param("op_type");
        if (sOpType != null) {
            try {
                indexRequest.opType(IndexRequest.OpType.fromString(sOpType));
            } catch (ElasticsearchIllegalArgumentException eia){
                try {
                    XContentBuilder builder = channel.newErrorBuilder();
                    channel.sendResponse(new BytesRestResponse(BAD_REQUEST, builder.startObject().field("error", eia.getMessage()).endObject()));
                } catch (IOException e1) {
                    logger.warn("Failed to send response", e1);
                    return;
                }
            }
        }
        String replicationType = request.param("replication");
        if (replicationType != null) {
            indexRequest.replicationType(ReplicationType.fromString(replicationType));
        }
        String consistencyLevel = request.param("consistency");
        if (consistencyLevel != null) {
            indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
        }
        client.index(indexRequest, new RestBuilderListener<IndexResponse>(channel) {
            @Override
            public RestResponse buildResponse(IndexResponse response, XContentBuilder builder) throws Exception {
                builder.startObject()
                        .field(Fields._INDEX, response.getIndex())
                        .field(Fields._TYPE, response.getType())
                        .field(Fields._ID, response.getId())
                        .field(Fields._VERSION, response.getVersion())
                        .field(Fields.CREATED, response.isCreated());
                builder.endObject();
                RestStatus status = OK;
                if (response.isCreated()) {
                    status = CREATED;
                }
                return new BytesRestResponse(status, builder);
            }
        });
    }
}

在索引请求中,支持下列参数:
routing: 路由信息,具有相同路由信息的文档存储在同一分片上
parent: 文档的parent id, 如果未设置路由,则会自动将其设置为路由
timestamp: 文档产生的时间戳
ttl: 过期时间
timeout: 超时时间
refresh: 此索引操作之后是否执行刷新,从而使文档可被搜索,默认为false
version: 文档的版本号
version_type: 版本类型,默认internal,支持internal、external、external_gt、external_gte和force
op_type: 索引操作类型,支持create和index
replication: 副本类型,支持async、sync和default
consistency: 一致性,支持one、quorum、all和default
请求的content即索引的source,文档内容
在封装完索引请求后,就要调用 client.index() 执行索引

4. 创建索引入口

在index()方法中,使用的Action是IndexAction.INSTANCE

public abstract class AbstractClient implements Client {
    @Override
    public void index(final IndexRequest request, final ActionListener<IndexResponse> listener) {
        execute(IndexAction.INSTANCE, request, listener);
    }
}

这个action在ActionModule中被TransportIndexAction注册

public class ActionModule extends AbstractModule {
    @Override
    protected void configure() {
        registerAction(IndexAction.INSTANCE, TransportIndexAction.class);
    }
}

因此在NodeClient的execute()方法中根据action获取到的transport action为TransportIndexAction

public class NodeClient extends AbstractClient {
    @Override
    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
        headers.applyTo(request);
        TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);  // TransportIndexAction
        transportAction.execute(request, listener);
    }
}

由于TransportIndexAction继承了TransportAction,因此调用过程为NodeClient.execute() -> TransportAction.execute() -> TransportIndexAction.doExecute()
索引的大体流程为:先判断是否需要创建索引,如果是则先创建索引,然后写入文档数据,否则直接写入文档数据

public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
    @Override
    protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
        if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
            createIndexRequest.index(request.index());
            createIndexRequest.mapping(request.type());
            createIndexRequest.cause("auto(index api)");
            createIndexRequest.masterNodeTimeout(request.timeout());
            createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>() {
                @Override
                public void onResponse(CreateIndexResponse result) {
                    innerExecute(request, listener);
                }

                @Override
                public void onFailure(Throwable e) {
                    if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
                        // we have the index, do it
                        try {
                            innerExecute(request, listener);
                        } catch (Throwable e1) {
                            listener.onFailure(e1);
                        }
                    } else {
                        listener.onFailure(e);
                    }
                }
            });
        } else {
            innerExecute(request, listener);
        }
    }
}

elasticsearch主要使用AutoCreateIndex.shouldAutoCreate()方法来判断是否需要创建索引

public class AutoCreateIndex {
    public AutoCreateIndex(Settings settings) {
        String value = settings.get("action.auto_create_index");
        if (value == null || Booleans.isExplicitTrue(value)) {
            needToCheck = true;
            globallyDisabled = false;
            matches = null;
            matches2 = null;
        } else if (Booleans.isExplicitFalse(value)) {
            needToCheck = false;
            globallyDisabled = true;
            matches = null;
            matches2 = null;
        } else {
            needToCheck = true;
            globallyDisabled = false;
            matches = Strings.commaDelimitedListToStringArray(value);
            matches2 = new String[matches.length];
            for (int i = 0; i < matches.length; i++) {
                matches2[i] = matches[i].substring(1);
            }
        }
    }

    public boolean shouldAutoCreate(String index, ClusterState state) {
        if (!needToCheck) {
            return false;
        }
        if (state.metaData().hasConcreteIndex(index)) {
            return false;
        }
        if (globallyDisabled) {
            return false;
        }
        if (matches == null) {
            return true;
        }
        for (int i = 0; i < matches.length; i++) {
            char c = matches[i].charAt(0);
            if (c == '-') {
                if (Regex.simpleMatch(matches2[i], index)) {
                    return false;
                }
            } else if (c == '+') {
                if (Regex.simpleMatch(matches2[i], index)) {
                    return true;
                }
            } else {
                if (Regex.simpleMatch(matches[i], index)) {
                    return true;
                }
            }
        }
        return false;
    }
}

其中参数和globallyDisabled的含义:
action.auto_create_index: elasticsearch配置文件的的配置项,表示是否允许创建索引
needToCheck: 是否需要检查能否创建索引,只有当action.auto_create_index为false时不需要检查,直接返回无法创建索引
globallyDisabled: 是否全局禁用创建索引,只有当action.auto_create_index为false时全局禁用创建索引,直接返回无法创建索引
如果当前集群中已经包含了要创建的索引,那么也不需要创建索引。其他情况则根据action.auto_create_index配置的正则表达式来判断
如果允许创建索引,则开始创建索引名的流程

5. 创建索引名

首先创建创建索引的请求createIndexRequest,设置了4个参数,分别是索引名index、索引mapping、创建索引的原因cause和master节点超时时间masterNodeTimeout

CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
createIndexRequest.index(request.index());
createIndexRequest.mapping(request.type());
createIndexRequest.cause("auto(index api)");
createIndexRequest.masterNodeTimeout(request.timeout());

然后开始调用createIndexAction.execute()方法创建索引名

public abstract class TransportAction<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {
    public final void execute(Request request, ActionListener<Response> listener) {
        if (forceThreadedListener()) {
            request.listenerThreaded(true);
        }
        if (request.listenerThreaded()) {
            listener = new ThreadedActionListener<>(threadPool, listener, logger);
        }

        ActionRequestValidationException validationException = request.validate();
        if (validationException != null) {
            listener.onFailure(validationException);
            return;
        }

        if (filters.length == 0) {
            try {
                // TransportAction 子类都要重写这个方法
                doExecute(request, listener);
            } catch(Throwable t) {
                logger.trace("Error during transport action execution.", t);
                listener.onFailure(t);
            }
        } else {
            RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger);
            requestFilterChain.proceed(actionName, request, listener);
        }
    }

    rotected abstract void doExecute(Request request, ActionListener<Response> listener);
}

从下面的类图可以看出,TransportCreateIndexAction继承了TransportMasterNodeOperation,调用过程即TransportAction.execute()-> TransportMasterNodeOperation.doExecute()方法来完成操作


TransportCreateIndexAction类图

在TransportMasterNodeOperation中主要是保证操作在master节点上执行

public abstract class TransportMasterNodeOperationAction<Request extends MasterNodeOperationRequest, Response extends ActionResponse> extends TransportAction<Request, Response> {
    @Override
    protected void doExecute(final Request request, final ActionListener<Response> listener) {
        innerExecute(request, listener, new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger), false);
    }

    private void innerExecute(final Request request, final ActionListener<Response> listener, final ClusterStateObserver observer, final boolean retrying) {
        final ClusterState clusterState = observer.observedState();
        final DiscoveryNodes nodes = clusterState.nodes();
        if (nodes.localNodeMaster() || localExecute(request)) {
            final ClusterBlockException blockException = checkBlock(request, clusterState);
            if (blockException != null) {
                if (!blockException.retryable()) {
                    listener.onFailure(blockException);
                    return;
                }
                logger.trace("can't execute due to a cluster block: [{}], retrying", blockException);
                observer.waitForNextChange(
                        new ClusterStateObserver.Listener() {
                            @Override
                            public void onNewClusterState(ClusterState state) {
                                innerExecute(request, listener, observer, false);
                            }

                            @Override
                            public void onClusterServiceClose() {
                                listener.onFailure(blockException);
                            }

                            @Override
                            public void onTimeout(TimeValue timeout) {
                                listener.onFailure(blockException);
                            }
                        }, new ClusterStateObserver.ValidationPredicate() {
                            @Override
                            protected boolean validate(ClusterState newState) {
                                ClusterBlockException blockException = checkBlock(request, newState);
                                return (blockException == null || !blockException.retryable());
                            }
                        }
                );

            } else {
                try {
                    threadPool.executor(executor).execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                masterOperation(request, clusterService.state(), listener);
                            } catch (Throwable e) {
                                listener.onFailure(e);
                            }
                        }
                    });
                } catch (Throwable t) {
                    listener.onFailure(t);
                }
            }
        } else {
            if (nodes.masterNode() == null) {
                if (retrying) {
                    listener.onFailure(new MasterNotDiscoveredException());
                } else {
                    logger.debug("no known master node, scheduling a retry");
                    observer.waitForNextChange(
                            new ClusterStateObserver.Listener() {
                                @Override
                                public void onNewClusterState(ClusterState state) {
                                    // 集群状态发生了改变, 重新执行该方法
                                    innerExecute(request, listener, observer, true);
                                }

                                @Override
                                public void onClusterServiceClose() {
                                    listener.onFailure(new NodeClosedException(clusterService.localNode()));
                                }

                                @Override
                                public void onTimeout(TimeValue timeout) {
                                    listener.onFailure(new MasterNotDiscoveredException("waited for [" + timeout + "]"));
                                }
                            }, new ClusterStateObserver.ChangePredicate() {
                                @Override
                                public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
                                                     ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
                                    return newState.nodes().masterNodeId() != null;
                                }

                                @Override
                                public boolean apply(ClusterChangedEvent event) {
                                    return event.nodesDelta().masterNodeChanged();
                                }
                            }
                    );
                }
                return;
            }
            processBeforeDelegationToMaster(request, clusterState);

            transportService.sendRequest(nodes.masterNode(), actionName, request, new BaseTransportResponseHandler<Response>() {
                @Override
                public Response newInstance() {
                    return newResponse();
                }

                @Override
                public void handleResponse(Response response) {
                    listener.onResponse(response);
                }

                @Override
                public String executor() {
                    return ThreadPool.Names.SAME;
                }

                @Override
                public void handleException(final TransportException exp) {
                    if (exp.unwrapCause() instanceof ConnectTransportException) {
                        // we want to retry here a bit to see if a new master is elected
                        logger.debug("connection exception while trying to forward request to master node [{}], scheduling a retry. Error: [{}]",
                                nodes.masterNode(), exp.getDetailedMessage());
                        observer.waitForNextChange(new ClusterStateObserver.Listener() {
                                                       @Override
                                                       public void onNewClusterState(ClusterState state) {
                                                           innerExecute(request, listener, observer, false);
                                                       }

                                                       @Override
                                                       public void onClusterServiceClose() {
                                                           listener.onFailure(new NodeClosedException(clusterService.localNode()));
                                                       }

                                                       @Override
                                                       public void onTimeout(TimeValue timeout) {
                                                           listener.onFailure(new MasterNotDiscoveredException());
                                                       }
                                                   }, new ClusterStateObserver.EventPredicate() {
                                                       @Override
                                                       public boolean apply(ClusterChangedEvent event) {
                                                           return event.nodesDelta().masterNodeChanged();
                                                       }
                                                   }
                        );
                    } else {
                        listener.onFailure(exp);
                    }
                }
            });
        }
    }
}

这个操作主要保证了两点:
(1)如果当前节点不是master,则将请求发送到master节点执行masterOperation()方法
(2)如果当前集群block了,则等待集群状态更新,然后重新执行完整的innerExecute()方法

然后进入到TransportCreateIndexAction.masterOperation()方法中,创建CreateIndexClusterStateUpdateRequest对象,用来创建索引时更新集群状态信息的请求,其中settings和mappings及aliases默认为空集合

public class TransportCreateIndexAction extends TransportMasterNodeOperationAction<CreateIndexRequest, CreateIndexResponse> {
    private final MetaDataCreateIndexService createIndexService;

    @Override
    protected void masterOperation(final CreateIndexRequest request, final ClusterState state, final ActionListener<CreateIndexResponse> listener) throws ElasticsearchException {
        String cause = request.cause();
        if (cause.length() == 0) {
            cause = "api";
        }

        final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, request.index())
                .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
                .settings(request.settings()).mappings(request.mappings())
                .aliases(request.aliases()).customs(request.customs());

        // 执行创建索引
        createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
            @Override
            public void onResponse(ClusterStateUpdateResponse response) {
                listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
            }

            @Override
            public void onFailure(Throwable t) {
                if (t instanceof IndexAlreadyExistsException) {
                    logger.trace("[{}] failed to create", t, request.index());
                } else {
                    logger.debug("[{}] failed to create", t, request.index());
                }
                listener.onFailure(t);
            }
        });
    }
}

然后调用MetaDataCreateIndexService的createIndex()方法,如果能获取到锁信息则直接执行重载的createIndex()方法,否则交给线程池去执行

public class MetaDataCreateIndexService extends AbstractComponent {
    public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
        final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index());

        if (mdLock.tryAcquire()) {
            createIndex(request, listener, mdLock);
            return;
        }
        threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new ActionRunnable(listener) {
            @Override
            public void doRun() throws InterruptedException {
                if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) {
                    listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock"));
                    return;
                }
                createIndex(request, listener, mdLock);
            }
        });
    }
}

在重载从createIndex()方法中,通过提交一个更新集群状态的任务来实现创建索引的具体逻辑

public class MetaDataCreateIndexService extends AbstractComponent {
    private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener, final Semaphore mdLock) {

        ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
        updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX);
        request.settings(updatedSettingsBuilder.build());

        clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]",
                Priority.URGENT,
                new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {

            @Override
            protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
                return new ClusterStateUpdateResponse(acknowledged);
            }

            @Override
            public void onAllNodesAcked(@Nullable Throwable t) {
                mdLock.release();
                super.onAllNodesAcked(t);
            }

            @Override
            public void onAckTimeout() {
                mdLock.release();
                super.onAckTimeout();
            }

            @Override
            public void onFailure(String source, Throwable t) {
                mdLock.release();
                super.onFailure(source, t);
            }

            @Override
            public ClusterState execute(ClusterState currentState) throws Exception {
                // 创建索引的具体逻辑
                // ...
            }
        });
    }
}

提交StateUpdateTask任务时,会创建一个UpdateTask对象,然后执行其run()方法,即MetaDataCreateIndexService中创建的AckedClusterStateUpdateTask匿名对象

public class InternalClusterService extends AbstractLifecycleComponent<ClusterService> implements ClusterService {
    public void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask) {
        if (!lifecycle.started()) {
            return;
        }
        try {
            final UpdateTask task = new UpdateTask(source, priority, updateTask);
            if (updateTask instanceof TimeoutClusterStateUpdateTask) {
                final TimeoutClusterStateUpdateTask timeoutUpdateTask = (TimeoutClusterStateUpdateTask) updateTask;
                updateTasksExecutor.execute(task, threadPool.scheduler(), timeoutUpdateTask.timeout(), new Runnable() {
                    @Override
                    public void run() {
                        threadPool.generic().execute(new Runnable() {
                            @Override
                            public void run() {
                                timeoutUpdateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source()));
                            }
                        });
                    }
                });
            } else {
                updateTasksExecutor.execute(task);
            }
        } catch (EsRejectedExecutionException e) {
            if (!lifecycle.stoppedOrClosed()) {
                throw e;
            }
        }
    }
}

在UpdateTask的run()方法中,会调用ClusterStateUpdateTask.execute()方法获取新的集群状态,

class UpdateTask extends TimedPrioritizedRunnable {

        public final ClusterStateUpdateTask updateTask;


        UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
            super(priority, source);
            this.updateTask = updateTask;
        }

        @Override
        public void run() {
            if (!lifecycle.started()) {
                logger.debug("processing [{}]: ignoring, cluster_service not started", source);
                return;
            }
            logger.debug("processing [{}]: execute", source);
            ClusterState previousClusterState = clusterState;
            // 当前节点是否为master
            if (!previousClusterState.nodes().localNodeMaster() && updateTask.runOnlyOnMaster()) {
                logger.debug("failing [{}]: local node is no longer master", source);
                updateTask.onNoLongerMaster(source);
                return;
            }
            // 新的集群状态
            ClusterState newClusterState;
            long startTimeNS = System.nanoTime();
            try {
                // 调用task的execute方法,获取新的集群状态
                newClusterState = updateTask.execute(previousClusterState);
            } catch (Throwable e) {
                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
                if (logger.isTraceEnabled()) {
                    StringBuilder sb = new StringBuilder("failed to execute cluster state update in ").append(executionTime).append(", state:\nversion [").append(previousClusterState.version()).append("], source [").append(source).append("]\n");
                    sb.append(previousClusterState.nodes().prettyPrint());
                    sb.append(previousClusterState.routingTable().prettyPrint());
                    sb.append(previousClusterState.readOnlyRoutingNodes().prettyPrint());
                    logger.trace(sb.toString(), e);
                }
                warnAboutSlowTaskIfNeeded(executionTime, source);
                updateTask.onFailure(source, e);
                return;
            }

            // 集群状态没有发生更改
            if (previousClusterState == newClusterState) {
                if (updateTask instanceof AckedClusterStateUpdateTask) {
                    //no need to wait for ack if nothing changed, the update can be counted as acknowledged
                    ((AckedClusterStateUpdateTask) updateTask).onAllNodesAcked(null);
                }
                if (updateTask instanceof ProcessedClusterStateUpdateTask) {
                    ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
                }
                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
                logger.debug("processing [{}]: took {} no change in cluster_state", source, executionTime);
                warnAboutSlowTaskIfNeeded(executionTime, source);
                return;
            }

            try {
                Discovery.AckListener ackListener = new NoOpAckListener();
                // 当前节点是master
                if (newClusterState.nodes().localNodeMaster()) {
                    // only the master controls the version numbers
                    Builder builder = ClusterState.builder(newClusterState).version(newClusterState.version() + 1);
                    // 重新构建routing table
                    if (previousClusterState.routingTable() != newClusterState.routingTable()) {
                        builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1));
                    }
                    // 重新构建meta data
                    if (previousClusterState.metaData() != newClusterState.metaData()) {
                        builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
                    }
                    newClusterState = builder.build();

                    if (updateTask instanceof AckedClusterStateUpdateTask) {
                        final AckedClusterStateUpdateTask ackedUpdateTask = (AckedClusterStateUpdateTask) updateTask;
                        if (ackedUpdateTask.ackTimeout() == null || ackedUpdateTask.ackTimeout().millis() == 0) {
                            ackedUpdateTask.onAckTimeout();
                        } else {
                            try {
                                ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState.version(), newClusterState.nodes(), threadPool);
                            } catch (EsRejectedExecutionException ex) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex);
                                }
                                //timeout straightaway, otherwise we could wait forever as the timeout thread has not started
                                ackedUpdateTask.onAckTimeout();
                            }
                        }
                    }
                }

                newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED);

                if (logger.isTraceEnabled()) {
                    StringBuilder sb = new StringBuilder("cluster state updated, source [").append(source).append("]\n");
                    sb.append(newClusterState.prettyPrint());
                    logger.trace(sb.toString());
                } else if (logger.isDebugEnabled()) {
                    logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source);
                }

                ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState);
                // new cluster state, notify all listeners
                final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
                if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
                    String summary = nodesDelta.shortSummary();
                    if (summary.length() > 0) {
                        logger.info("{}, reason: {}", summary, source);
                    }
                }

                // TODO, do this in parallel (and wait)
                for (DiscoveryNode node : nodesDelta.addedNodes()) {
                    if (!nodeRequiresConnection(node)) {
                        continue;
                    }
                    try {
                        transportService.connectToNode(node);
                    } catch (Throwable e) {
                        // the fault detection will detect it as failed as well
                        logger.warn("failed to connect to node [" + node + "]", e);
                    }
                }

                // if we are the master, publish the new state to all nodes
                // we publish here before we send a notification to all the listeners, since if it fails
                // we don't want to notify
                if (newClusterState.nodes().localNodeMaster()) {
                    logger.debug("publishing cluster state version {}", newClusterState.version());
                    discoveryService.publish(newClusterState, ackListener);
                }

                // update the current cluster state
                // 更新集群的state
                clusterState = newClusterState;
                logger.debug("set local cluster state to version {}", newClusterState.version());
                for (ClusterStateListener listener : preAppliedListeners) {
                    try {
                        listener.clusterChanged(clusterChangedEvent);
                    } catch (Exception ex) {
                        logger.warn("failed to notify ClusterStateListener", ex);
                    }
                }

                for (DiscoveryNode node : nodesDelta.removedNodes()) {
                    try {
                        transportService.disconnectFromNode(node);
                    } catch (Throwable e) {
                        logger.warn("failed to disconnect to node [" + node + "]", e);
                    }
                }

                newClusterState.status(ClusterState.ClusterStateStatus.APPLIED);

                for (ClusterStateListener listener : postAppliedListeners) {
                    try {
                        listener.clusterChanged(clusterChangedEvent);
                    } catch (Exception ex) {
                        logger.warn("failed to notify ClusterStateListener", ex);
                    }
                }

                //manual ack only from the master at the end of the publish
                if (newClusterState.nodes().localNodeMaster()) {
                    try {
                        ackListener.onNodeAck(newClusterState.nodes().localNode(), null);
                    } catch (Throwable t) {
                        logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode());
                    }
                }

                // 调用task的clusterStateProcessed()方法
                if (updateTask instanceof ProcessedClusterStateUpdateTask) {
                    ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState);
                }

                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
                logger.debug("processing [{}]: took {} done applying updated cluster_state (version: {})", source, executionTime, newClusterState.version());
                warnAboutSlowTaskIfNeeded(executionTime, source);
            } catch (Throwable t) {
                TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - startTimeNS)));
                StringBuilder sb = new StringBuilder("failed to apply updated cluster state in ").append(executionTime).append(":\nversion [").append("], source [").append(source).append("]\n");
                sb.append(newClusterState.nodes().prettyPrint());
                sb.append(newClusterState.routingTable().prettyPrint());
                sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint());
                logger.warn(sb.toString(), t);
                // TODO: do we want to call updateTask.onFailure here?
            }
        }
    }

在完成索引创建完成后,集群状态信息会发生变化,elasticsearch会将这个变化发布到其他节点,以维持集群统一的状态信息

上一篇 下一篇

猜你喜欢

热点阅读