elasticsearchElasticsearch分布式搜索引擎ElasticSearch&Lucene

ES5.6 Bulk源码解析

2017-09-29  本文已影响89人  YG_9013

Bulk注册

在启动类BootStrap的start()方法中,启动了node.start()方法。在Node初始化的过程中,加载了一系列的模块和插件,其中包含ActionModel。

 ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
                settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
                threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService);
 modules.add(actionModule);

在ActionModel中,注册了我们常用的一些操作action,比如说我们这次解析的BulkAction:

  actions.register(UpdateAction.INSTANCE, TransportUpdateAction.class);
  actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class,TransportShardMultiGetAction.class);
  actions.register(BulkAction.INSTANCE, TransportBulkAction.class,TransportShardBulkAction.class);

并且初始化RestHandler:

 registerHandler.accept(new RestMultiTermVectorsAction(settings, restController));
 registerHandler.accept(new RestBulkAction(settings, restController));
 registerHandler.accept(new RestUpdateAction(settings, restController));

在RestBulkAction中规定了我们的查询方式:

  controller.registerHandler(POST, "/_bulk", this);
  controller.registerHandler(PUT, "/_bulk", this);
  controller.registerHandler(POST, "/{index}/_bulk", this);
  controller.registerHandler(PUT, "/{index}/_bulk", this);
  controller.registerHandler(POST, "/{index}/{type}/_bulk", this);
  controller.registerHandler(PUT, "/{index}/{type}/_bulk", this);

接收到请求

RestBulkAction在prepareRequest方法中将我们普通的RestRequest转化为BulkReqest,并通过NodeClient调用:

 channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel));

而在NodeClient的bulk中则是调用了NodeClient的doExecute()方法。

doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener)

传入的Action是BulkAction.Instance,request就是上一步封装的BulkRequest,listener则是监听器。

在doExecute方法中,首先将普通的action转化为tansportAction,然后用转化后的tansportAction执行该请求:

transportAction(action).execute(request, listener);

bulkAction转化后变为TransportBulkAction,而TransportBulkAction的execute方法则是调用本身的doExecute()方法。在doExecut()方法中首先将存在和不存在的索引分类:

1)Step 1: collect all the indices in the request
2)Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create that we'll use when we try to run the requests.
3)Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.

然后执行executeBulk()方法,接着在executeBulk中创建一个BulkOperation,并开始执行该BulkOperation:

void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener,
        final AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
    new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run();
}

在BulkOperation中存在两次遍历Bulk中所有的请求,第一次遍历则将给该请求设置Routing,Mapping等等,如果允许产生ID,则自动生成ID。第二次遍历则是根据shardID将请求分类。ES官网有说到批量处理时让用bulk,原因是bulk处理请求时做了一些底层的优化。这就是一个优化点,将同一个shard的请求集合在一起直接发送到节点对应的shard,避免请求在节点间传递,影响效率。

for (int i = 0; i < bulkRequest.requests.size(); i++) {
    ....
    switch (docWriteRequest.opType()) {
                    case CREATE:
                    case INDEX:
                        IndexRequest indexRequest = (IndexRequest) docWriteRequest;
                        MappingMetaData mappingMd = null;
                        final IndexMetaData indexMetaData = metaData.index(concreteIndex);
                        if (indexMetaData != null) {
                            mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
                        }
                        indexRequest.resolveRouting(metaData);
                        indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());
            ....
    }
....
}

....
for (int i = 0; i < bulkRequest.requests.size(); i++) {
            DocWriteRequest request = bulkRequest.requests.get(i);
            if (request == null) {
                continue;
             }
            String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
            ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
            List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
            shardRequests.add(new BulkItemRequest(i, request));
         }

然后针对不同的shardRequest,分别用shardBulkAction处理:

shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {}

每个shard的处理流程

接下来就是复杂的类继承关系了:

TransportShardBulkAction>TransportWriteAction >TransportReplicationAction>TransportAction

上一步的shardBulkAction.execute()方法则是执行的TransportAction的execute方法。我看得源码版本是5.6版本的,与5.0版本相比,ES增加了一个
TransportWriteAction类,而且在TransportReplicationAction不是直接运行run方法,而是通过transportService的RPC接口在实现功能。具体的流程如下:

1)TransportAction.execute()方法会调用TransportReplicationAction的doExecute()方法

2)在TransportReplicationAction的doExecute()方法中执行ReroutePhase的run方法,run方法中根据请求的shardID获取到primary shardID,同时得到primary shard的NodeID,如果当前节点包含primary shard,则执行performLocalAction方法,否则执行performRemoteAction。

3)performLocalAction和performRemoteAction最终都将执行performAction方法,在performAction中我们可以看到,transportService发送请求:

transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {}

4)transportService接收到请求后用的PrimaryOperationTransportHandler处理,至于PrimaryOperationTransportHandler是在TransportReplicationAction中注册的:

transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
        new PrimaryOperationTransportHandler());

5)PrimaryOperationTransportHandler则是一个primary操作的处理类,在这个类接收到信息之后调用AsyncPrimaryAction处理:

@Override
    public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
        new AsyncPrimaryAction(request.request, request.targetAllocationID, channel, (ReplicationTask) task).run();
    }

6)在AsyncPrimaryAction中首先获取shard锁,如果成功的获取到锁则调用自身的onresponse()方法,否则将获取操作加入线程池:

            synchronized (this) {
            releasable = tryAcquire();
            if (releasable == null) {
                // blockOperations is executing, this operation will be retried by blockOperations once it finishes
                if (delayedOperations == null) {
                    delayedOperations = new ArrayList<>();
                }
                final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
                if (executorOnDelay != null) {
                    delayedOperations.add(
                        new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
                            new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
                } else {
                    delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
                }
                return;
            }
        }

7)在onresponse中,如果该primaryShardReference已经被移动了,则获取到正确的primary shard和nodeID重新发送请求。否则就用primaryShardReference直接处理:

 @Override
    public void onResponse(PrimaryShardReference primaryShardReference) {
        try {
            if (primaryShardReference.isRelocated()) {
                primaryShardReference.close(); // release shard operation lock as soon as possible
                setPhase(replicationTask, "primary_delegation");
                // delegate primary phase to relocation target
                // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
                // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
                final ShardRouting primary = primaryShardReference.routingEntry();
                assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
                DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
                transportService.sendRequest(relocatingNode, transportPrimaryAction,
                    new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId()),
                    transportOptions,
                    new TransportChannelResponseHandler<Response>(logger, channel, "rerouting indexing to target primary " + primary,
                        TransportReplicationAction.this::newResponseInstance) {

                        @Override
                        public void handleResponse(Response response) {
                            setPhase(replicationTask, "finished");
                            super.handleResponse(response);
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            setPhase(replicationTask, "finished");
                            super.handleException(exp);
                        }
                    });
            } else {
                setPhase(replicationTask, "primary");
                final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex());
                final boolean executeOnReplicas = (indexMetaData == null) || shouldExecuteReplication(indexMetaData);
                final ActionListener<Response> listener = createResponseListener(primaryShardReference);
                createReplicatedOperation(request,
                        ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
                        primaryShardReference, executeOnReplicas)
                        .execute();
            }
        } catch (Exception e) {
            Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
            onFailure(e);
        }
    }

8)createReplicatedOperation看名字还以为直接就是副本处理了,点进去看了之后才发现是先执行primary,后执行replia。

 primaryResult = primary.perform(request);
    ...
 performOnReplicas(replicaRequest, shards);

主分片处理

主分片的处理调用的是PrimaryShardReference.perform()方法,在该方法中则是调用shardOperationOnPrimary()进行主分片的处理。

shardOperationOnPrimary()方法则是由TransportShardBulkAction来实现的,具体执行的步骤如下:

1)获取节点中所有的索引元数据

2)获取版本号

3)更新mapping

4)调用Engin底层的代码。比如说primary.delete(delete),primary.index(operation)等等。

5)写到tanslog中

副本分片和主分片类似,这里就不做过多解释。

上一篇下一篇

猜你喜欢

热点阅读