elasticsearch源码分析

Elasticsearch源码分析-搜索分析(三)

2018-12-06  本文已影响0人  尹亮_36cd

0. 前言

上一篇文章中,我们主要讲解了elasticsearch搜索过程的Query部分,包括初始化参数、query解析和lucene search操作。
本篇文章将继续分析搜索,主要是QUERY_THEN_FETCH搜索的Fetch阶段,包括文档排序和数据合并,具体的操作流程如下:
(1)首先对query结果进行排序,获取要fetch的doc id
(2)按shard把这些id填充到docIdsToLoad对象中
(3)按shard去elasticsearch获取文档详情
(4)将第一步中已经排好序的文档与fetch到的结果按照顺序合并
(5)如果请求中有scroll,重新构建scroll id
(6)向调用方返回响应数据
(7)如果不是scroll请求,则释放search context

1. Query结果排序

  1. 数据准备
    在Query阶段获取到shard的query结果后,将会调用processFirstPhaseResult()方法将结果放入firstResults对象中,用于一阶段全局排序。
protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
    protected final void processFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result) {
        firstResults.set(shardIndex, result);
        AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures;
        if (shardFailures != null) {
            shardFailures.set(shardIndex, null);
        }
    }
}
  1. query结果排序和截断
    在获取第一阶段Query结果后,开始调用SearchPhaseController.sortDocs()方法对文档进行排序。
public class TransportSearchQueryThenFetchAction extends TransportSearchTypeAction {
    @Override
    protected void moveToSecondPhase() throws Exception {
        boolean useScroll = !useSlowScroll && request.scroll() != null;
        sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
        searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
        if (docIdsToLoad.asList().isEmpty()) {
            finishHim();
            return;
        }
        ...
    }
}

假设请求的size为N,那么每个shard都会返回N条文档,那么需要进行全局排序获取topN条文档,elasticsearch采用的是使用lucene的

TopDocs mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs)

进行数据的截断,只获取score比较高的文档。

3. 准备fetch doc

在获取merge后的doc id后,按shard将doc信息放入docsIdsToLoad对象中,用于fetch。

public class SearchPhaseController extends AbstractComponent {
    public void fillDocIdsToLoad(AtomicArray<IntArrayList> docsIdsToLoad, ScoreDoc[] shardDocs) {
        for (ScoreDoc shardDoc : shardDocs) {
            IntArrayList list = docsIdsToLoad.get(shardDoc.shardIndex);
            if (list == null) {
                list = new IntArrayList();
                docsIdsToLoad.set(shardDoc.shardIndex, list);
            }
            list.add(shardDoc.doc);
        }
    }
}

在获取到要fetch的doc id集合后,按shard进行遍历:
(1)获取shard所在的节点信息,然后创建fetch request
(2)调用executeFetch()方法fetch文档,入参包括shard信息、node信息和fetch request对象

public class TransportSearchQueryThenFetchAction extends TransportSearchTypeAction {
    @Override
    protected void moveToSecondPhase() throws Exception {
        ...
        for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
            QuerySearchResultProvider queryResult = firstResults.get(entry.index);
            DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
            ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard);
            executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
        }
    }
}

4. 执行fetch请求

执行fetch的过程,即将fetch请求发送到指定的节点执行,在获取执行结果后执行finishHim()进行数据的merge()

void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
    searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
        @Override
        public void onResult(FetchSearchResult result) {
            result.shardTarget(shardTarget);
            fetchResults.set(shardIndex, result);
            if (counter.decrementAndGet() == 0) {
                finishHim();
            }
        }
        @Override
        public void onFailure(Throwable t) {
            docIdsToLoad.set(shardIndex, null);
            onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
        }
    });
}
  1. 发送fetch请求到指定节点
    FETCH_ID_ACTION_NAME变量值为"indices:data/read/search[phase/fetch/id]",elasticsearch将该ACTION和request发送到指定的node上
public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
    sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, listener);
}
  1. 判断要执行的节点
    如果要执行的节点是当前节点,那么直接执行execute()方法,具体的步骤:
    (1)先执行callable的call()方法,即执行SearchService的executeFetchPhase()方法
    (2)根据执行结果,如果为null则执行listener.onFailure()方法,否则执行listener.onResult()
    如果要执行的节点不是当前节点,需要将ACTION和request发送到指定的node节点上
private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
    if (clusterService.state().nodes().localNodeId().equals(node.id())) {
        execute(new Callable<FetchSearchResult>() {
            @Override
            public FetchSearchResult call() throws Exception {
                return searchService.executeFetchPhase(request);
            }
        }, listener);
    } else {
        transportService.sendRequest(node, action, request, new BaseTransportResponseHandler<FetchSearchResult>() {
            @Override
            public FetchSearchResult newInstance() {
                return new FetchSearchResult();
            }
            @Override
            public void handleResponse(FetchSearchResult response) {
                listener.onResult(response);
            }
            @Override
            public void handleException(TransportException exp) {
                listener.onFailure(exp);
            }
            @Override
            public String executor() {
                return ThreadPool.Names.SAME;
            }
        });
    }
}
  1. 远程节点handler接收消息
    在elasticsearch启动时会注入SearchServiceTransportAction对象,并且会将SearchFetchByIdTransportHandler注入FETCH_ID_ACTION_NAME中
public class SearchServiceTransportAction extends AbstractComponent {
    @Inject
    public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, SearchService searchService) {
        super(settings);
        transportService.registerHandler(FETCH_ID_ACTION_NAME, new SearchFetchByIdTransportHandler());
        ...
    }
}

FetchByIdTransportHandler类仅重写了父类FetchByIdTransportHandler的newInstance()方法,具体的类图如下:


SearchFetchByIdTransportHandler.png
private class SearchFetchByIdTransportHandler extends FetchByIdTransportHandler<ShardFetchSearchRequest> {
    @Override
    public ShardFetchSearchRequest newInstance() {
        return new ShardFetchSearchRequest();
    }
}

FetchByIdTransportHandler使用messageReceived()方法接收其他节点请求,然后执行SearchService的executeFetchPhase()方法

private abstract class FetchByIdTransportHandler<Request extends ShardFetchRequest> extends BaseTransportRequestHandler<Request> {
    public abstract Request newInstance();
    @Override
    public void messageReceived(Request request, TransportChannel channel) throws Exception {
        FetchSearchResult result = searchService.executeFetchPhase(request);
        channel.sendResponse(result);
    }
    @Override
    public String executor() {
        return ThreadPool.Names.SEARCH;
    }
}

5. fetch逻辑

SearchService的executeFetchPhase()方法主要逻辑如下:
(1)根据context id查找search context
(2)将要fetch的doc id放入context中
(3)fetch预处理
(4)调用FetchPhase.execute()方法执行Fetch
(5)如果不是scroll请求,释放search context
(6)记录慢fetch日志

public class SearchService extends AbstractLifecycleComponent<SearchService> {
    public FetchSearchResult executeFetchPhase(ShardFetchRequest request) throws ElasticsearchException {
        final SearchContext context = findContext(request.id());
        contextProcessing(context);
        try {
            if (request.lastEmittedDoc() != null) {
                context.lastEmittedDoc(request.lastEmittedDoc());
            }
            context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
            context.indexShard().searchService().onPreFetchPhase(context);
            long time = System.nanoTime();
            fetchPhase.execute(context);
            if (context.scroll() == null) {
                freeContext(request.id());
            } else {
                contextProcessedSuccessfully(context);
            }
            context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time);
            return context.fetchResult();
        } catch (Throwable e) {
            context.indexShard().searchService().onFailedFetchPhase(context);
            logger.trace("Fetch phase failed", e);
            processFailure(context, e);
            throw ExceptionsHelper.convertToRuntime(e);
        } finally {
            cleanContext(context);
        }
    }
}

在fetch阶段,具体逻辑为:
(1)首先创建FieldsVisitor对象,根据是否要fetch source,创建UidAndSourceFieldsVisitor或JustUidFieldsVisitor
(2)遍历每个要fetch的文档,判断文档是否为Nested结构,然后分别调用createNestedSearchHit()或者createSearchHit()得到SearchHit
(3)获取SearchHit后再补充如下阶段的结果:ScriptFieldsPhase, PartialFieldsPhase, MatchedQueriesPhase, ExplainPhase, HighlightPhase, FetchSourceSubPhase, VersionPhase, FieldDataFieldsFetchSubPhase, InnerHitsFetchSubPhase
(4)将hit结果集hits、全部命中结果数totalHits和最大得分maxScore放入context.fetchResult().hits对象中

public void execute(SearchContext context) {
    FieldsVisitor fieldsVisitor;
    // 实例化fieldsVisitor 及判断是否要fetch source
    ...
    InternalSearchHit[] hits = new InternalSearchHit[context.docIdsToLoadSize()]; 
    FetchSubPhase.HitContext hitContext = new FetchSubPhase.HitContext();
    for (int index = 0; index < context.docIdsToLoadSize(); index++) {
        int docId = context.docIdsToLoad()[context.docIdsToLoadFrom() + index];
        int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());
        AtomicReaderContext subReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
        int subDocId = docId - subReaderContext.docBase;

        final InternalSearchHit searchHit;
        try {
            int rootDocId = findRootDocumentIfNested(context, subReaderContext, subDocId);
            if (rootDocId != -1) {
                searchHit = createNestedSearchHit(context, docId, subDocId, rootDocId, extractFieldNames, loadAllStored, fieldNames, subReaderContext);
            } else {
                searchHit = createSearchHit(context, fieldsVisitor, docId, subDocId, extractFieldNames, subReaderContext);
            }
        } catch (IOException e) {
            throw ExceptionsHelper.convertToElastic(e);
        }
        hits[index] = searchHit;
        hitContext.reset(searchHit, subReaderContext, subDocId, context.searcher().getIndexReader());
        for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
            if (fetchSubPhase.hitExecutionNeeded(context)) {
                fetchSubPhase.hitExecute(context, hitContext);
            }
        }
    }

    for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
        if (fetchSubPhase.hitsExecutionNeeded(context)) {
            fetchSubPhase.hitsExecute(context, hits);
        }
    }
    context.fetchResult().hits(new InternalSearchHits(hits, context.queryResult().topDocs().totalHits, context.queryResult().topDocs().getMaxScore()));
}

在创建SearchHit时,使用loadStoredFields从lucene中获取已经存储的字段信息,代码如下:

public class FetchPhase implements SearchPhase {
    private InternalSearchHit createSearchHit(SearchContext context, FieldsVisitor fieldsVisitor, int docId, int subDocId, List<String> extractFieldNames, AtomicReaderContext subReaderContext) {
        // fetch subDocId
        loadStoredFields(context, subReaderContext, fieldsVisitor, subDocId);
        ...
    }
    private void loadStoredFields(SearchContext searchContext, AtomicReaderContext readerContext, FieldsVisitor fieldVisitor, int docId) {
        fieldVisitor.reset();
        try {
            readerContext.reader().document(docId, fieldVisitor); 
        } catch (IOException e) {
            throw new FetchPhaseExecutionException(searchContext, "Failed to fetch doc id [" + docId + "]", e);
        }
    }
}
public final class SegmentReader extends AtomicReader implements Accountable {
  @Override
  public void document(int docID, StoredFieldVisitor visitor) throws IOException {
    checkBounds(docID);
    getFieldsReader().visitDocument(docID, visitor);
  }
}

读取的字段信息,是从lucene中解压获取获取的,这里仅能获取store为true的field数据

public final class CompressingStoredFieldsReader extends StoredFieldsReader {
  @Override
  public void visitDocument(int docID, StoredFieldVisitor visitor)
      throws IOException {
    fieldsStream.seek(indexReader.getStartPointer(docID));

    final int docBase = fieldsStream.readVInt();
    final int chunkDocs = fieldsStream.readVInt();
    // 具体代码略...
  }
}

6. 合并结果

在成功获取到fetch数据后,将调用listener.onResult() 进行设置fetch result及调用finishHim()完成数据合并并返回响应数据,具体逻辑如下:
(1)elasticsearch调用SearchPhaseController的merge()方法进行数据合并
(2)如果请求中包含scroll,则需要重写生成scroll id,用于后面继续scroll
(3)调用listener.onResponse()向调用放返回响应数据
(4)如果当前不是scroll请求,则需要释放search context

private void finishHim() {
    threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
        @Override
        public void doRun() throws IOException {
            final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults);
            String scrollId = null;

            if (request.scroll() != null) {
                scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
            }
            listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));

            releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
        }

        @Override
        public void onFailure(Throwable t) {
            ...
        }
    });
}

合并数据的逻辑主要如下:
(1)获取文档的最大得分maxScore和命中总的文档数totalHits
(2)按照fetch开始时已经排好序的sortedDocs顺序,填充shard、score和fetch结果SearchHit
(3)merge其他信息,如suggest、facets、aggregation等等
(4)返回merge后的结果集

public InternalSearchResponse merge(ScoreDoc[] sortedDocs,
              AtomicArray<? extends QuerySearchResultProvider> queryResultsArr,
              AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr) {
       
    long totalHits = 0;
    float maxScore = Float.NEGATIVE_INFINITY;
    for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
        QuerySearchResult result = entry.value.queryResult();
        totalHits += result.topDocs().totalHits;
        if (!Float.isNaN(result.topDocs().getMaxScore())) {
            maxScore = Math.max(maxScore, result.topDocs().getMaxScore());
        }
    }
    if (Float.isInfinite(maxScore)) {
        maxScore = Float.NaN;
    }
    for (AtomicArray.Entry<? extends FetchSearchResultProvider> entry : fetchResults) {
        entry.value.fetchResult().initCounter();
    }
    List<InternalSearchHit> hits = new ArrayList<>();
    if (!fetchResults.isEmpty()) {
        for (ScoreDoc shardDoc : sortedDocs) {
            FetchSearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
            if (fetchResultProvider == null) {
                continue;
            }
            FetchSearchResult fetchResult = fetchResultProvider.fetchResult();
            int index = fetchResult.counterGetAndIncrement();
            if (index < fetchResult.hits().internalHits().length) {
                InternalSearchHit searchHit = fetchResult.hits().internalHits()[index];
                searchHit.score(shardDoc.score);
                searchHit.shard(fetchResult.shardTarget());
                if (sorted) {
                    FieldDoc fieldDoc = (FieldDoc) shardDoc;
                    searchHit.sortValues(fieldDoc.fields);
                    if (sortScoreIndex != -1) {
                        searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue());
                    }
                }
                hits.add(searchHit);
            }
        }
    }
    InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore);

    return new InternalSearchResponse(searchHits, facets, aggregations, suggest, timedOut, terminatedEarly);
}

至此,完成了elasticsearch搜索的全过程

上一篇 下一篇

猜你喜欢

热点阅读