Elasticsearch源码分析-搜索分析(三)
0. 前言
在上一篇文章中,我们主要讲解了elasticsearch搜索过程的Query部分,包括初始化参数、query解析和lucene search操作。
本篇文章将继续分析搜索,主要是QUERY_THEN_FETCH搜索的Fetch阶段,包括文档排序和数据合并,具体的操作流程如下:
(1)首先对query结果进行排序,获取要fetch的doc id
(2)按shard把这些id填充到docIdsToLoad对象中
(3)按shard去elasticsearch获取文档详情
(4)将第一步中已经排好序的文档与fetch到的结果按照顺序合并
(5)如果请求中有scroll,重新构建scroll id
(6)向调用方返回响应数据
(7)如果不是scroll请求,则释放search context
1. Query结果排序
- 数据准备
在Query阶段获取到shard的query结果后,将会调用processFirstPhaseResult()方法将结果放入firstResults对象中,用于一阶段全局排序。
protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
protected final void processFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result) {
firstResults.set(shardIndex, result);
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures;
if (shardFailures != null) {
shardFailures.set(shardIndex, null);
}
}
}
- query结果排序和截断
在获取第一阶段Query结果后,开始调用SearchPhaseController.sortDocs()方法对文档进行排序。
public class TransportSearchQueryThenFetchAction extends TransportSearchTypeAction {
@Override
protected void moveToSecondPhase() throws Exception {
boolean useScroll = !useSlowScroll && request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}
...
}
}
假设请求的size为N,那么每个shard都会返回N条文档,那么需要进行全局排序获取topN条文档,elasticsearch采用的是使用lucene的
TopDocs mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs)
进行数据的截断,只获取score比较高的文档。
3. 准备fetch doc
在获取merge后的doc id后,按shard将doc信息放入docsIdsToLoad对象中,用于fetch。
public class SearchPhaseController extends AbstractComponent {
public void fillDocIdsToLoad(AtomicArray<IntArrayList> docsIdsToLoad, ScoreDoc[] shardDocs) {
for (ScoreDoc shardDoc : shardDocs) {
IntArrayList list = docsIdsToLoad.get(shardDoc.shardIndex);
if (list == null) {
list = new IntArrayList();
docsIdsToLoad.set(shardDoc.shardIndex, list);
}
list.add(shardDoc.doc);
}
}
}
在获取到要fetch的doc id集合后,按shard进行遍历:
(1)获取shard所在的节点信息,然后创建fetch request
(2)调用executeFetch()方法fetch文档,入参包括shard信息、node信息和fetch request对象
public class TransportSearchQueryThenFetchAction extends TransportSearchTypeAction {
@Override
protected void moveToSecondPhase() throws Exception {
...
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResultProvider queryResult = firstResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}
}
4. 执行fetch请求
执行fetch的过程,即将fetch请求发送到指定的节点执行,在获取执行结果后执行finishHim()进行数据的merge()
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override
public void onResult(FetchSearchResult result) {
result.shardTarget(shardTarget);
fetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void onFailure(Throwable t) {
docIdsToLoad.set(shardIndex, null);
onFetchFailure(t, fetchSearchRequest, shardIndex, shardTarget, counter);
}
});
}
- 发送fetch请求到指定节点
FETCH_ID_ACTION_NAME变量值为"indices:data/read/search[phase/fetch/id]",elasticsearch将该ACTION和request发送到指定的node上
public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, listener);
}
- 判断要执行的节点
如果要执行的节点是当前节点,那么直接执行execute()方法,具体的步骤:
(1)先执行callable的call()方法,即执行SearchService的executeFetchPhase()方法
(2)根据执行结果,如果为null则执行listener.onFailure()方法,否则执行listener.onResult()
如果要执行的节点不是当前节点,需要将ACTION和request发送到指定的node节点上
private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
execute(new Callable<FetchSearchResult>() {
@Override
public FetchSearchResult call() throws Exception {
return searchService.executeFetchPhase(request);
}
}, listener);
} else {
transportService.sendRequest(node, action, request, new BaseTransportResponseHandler<FetchSearchResult>() {
@Override
public FetchSearchResult newInstance() {
return new FetchSearchResult();
}
@Override
public void handleResponse(FetchSearchResult response) {
listener.onResult(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
}
- 远程节点handler接收消息
在elasticsearch启动时会注入SearchServiceTransportAction对象,并且会将SearchFetchByIdTransportHandler注入FETCH_ID_ACTION_NAME中
public class SearchServiceTransportAction extends AbstractComponent {
@Inject
public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, SearchService searchService) {
super(settings);
transportService.registerHandler(FETCH_ID_ACTION_NAME, new SearchFetchByIdTransportHandler());
...
}
}
FetchByIdTransportHandler类仅重写了父类FetchByIdTransportHandler的newInstance()方法,具体的类图如下:
SearchFetchByIdTransportHandler.png
private class SearchFetchByIdTransportHandler extends FetchByIdTransportHandler<ShardFetchSearchRequest> {
@Override
public ShardFetchSearchRequest newInstance() {
return new ShardFetchSearchRequest();
}
}
FetchByIdTransportHandler使用messageReceived()方法接收其他节点请求,然后执行SearchService的executeFetchPhase()方法
private abstract class FetchByIdTransportHandler<Request extends ShardFetchRequest> extends BaseTransportRequestHandler<Request> {
public abstract Request newInstance();
@Override
public void messageReceived(Request request, TransportChannel channel) throws Exception {
FetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
}
@Override
public String executor() {
return ThreadPool.Names.SEARCH;
}
}
5. fetch逻辑
SearchService的executeFetchPhase()方法主要逻辑如下:
(1)根据context id查找search context
(2)将要fetch的doc id放入context中
(3)fetch预处理
(4)调用FetchPhase.execute()方法执行Fetch
(5)如果不是scroll请求,释放search context
(6)记录慢fetch日志
public class SearchService extends AbstractLifecycleComponent<SearchService> {
public FetchSearchResult executeFetchPhase(ShardFetchRequest request) throws ElasticsearchException {
final SearchContext context = findContext(request.id());
contextProcessing(context);
try {
if (request.lastEmittedDoc() != null) {
context.lastEmittedDoc(request.lastEmittedDoc());
}
context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
context.indexShard().searchService().onPreFetchPhase(context);
long time = System.nanoTime();
fetchPhase.execute(context);
if (context.scroll() == null) {
freeContext(request.id());
} else {
contextProcessedSuccessfully(context);
}
context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time);
return context.fetchResult();
} catch (Throwable e) {
context.indexShard().searchService().onFailedFetchPhase(context);
logger.trace("Fetch phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
}
}
}
在fetch阶段,具体逻辑为:
(1)首先创建FieldsVisitor对象,根据是否要fetch source,创建UidAndSourceFieldsVisitor或JustUidFieldsVisitor
(2)遍历每个要fetch的文档,判断文档是否为Nested结构,然后分别调用createNestedSearchHit()或者createSearchHit()得到SearchHit
(3)获取SearchHit后再补充如下阶段的结果:ScriptFieldsPhase, PartialFieldsPhase, MatchedQueriesPhase, ExplainPhase, HighlightPhase, FetchSourceSubPhase, VersionPhase, FieldDataFieldsFetchSubPhase, InnerHitsFetchSubPhase
(4)将hit结果集hits、全部命中结果数totalHits和最大得分maxScore放入context.fetchResult().hits对象中
public void execute(SearchContext context) {
FieldsVisitor fieldsVisitor;
// 实例化fieldsVisitor 及判断是否要fetch source
...
InternalSearchHit[] hits = new InternalSearchHit[context.docIdsToLoadSize()];
FetchSubPhase.HitContext hitContext = new FetchSubPhase.HitContext();
for (int index = 0; index < context.docIdsToLoadSize(); index++) {
int docId = context.docIdsToLoad()[context.docIdsToLoadFrom() + index];
int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());
AtomicReaderContext subReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
int subDocId = docId - subReaderContext.docBase;
final InternalSearchHit searchHit;
try {
int rootDocId = findRootDocumentIfNested(context, subReaderContext, subDocId);
if (rootDocId != -1) {
searchHit = createNestedSearchHit(context, docId, subDocId, rootDocId, extractFieldNames, loadAllStored, fieldNames, subReaderContext);
} else {
searchHit = createSearchHit(context, fieldsVisitor, docId, subDocId, extractFieldNames, subReaderContext);
}
} catch (IOException e) {
throw ExceptionsHelper.convertToElastic(e);
}
hits[index] = searchHit;
hitContext.reset(searchHit, subReaderContext, subDocId, context.searcher().getIndexReader());
for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
if (fetchSubPhase.hitExecutionNeeded(context)) {
fetchSubPhase.hitExecute(context, hitContext);
}
}
}
for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
if (fetchSubPhase.hitsExecutionNeeded(context)) {
fetchSubPhase.hitsExecute(context, hits);
}
}
context.fetchResult().hits(new InternalSearchHits(hits, context.queryResult().topDocs().totalHits, context.queryResult().topDocs().getMaxScore()));
}
在创建SearchHit时,使用loadStoredFields从lucene中获取已经存储的字段信息,代码如下:
public class FetchPhase implements SearchPhase {
private InternalSearchHit createSearchHit(SearchContext context, FieldsVisitor fieldsVisitor, int docId, int subDocId, List<String> extractFieldNames, AtomicReaderContext subReaderContext) {
// fetch subDocId
loadStoredFields(context, subReaderContext, fieldsVisitor, subDocId);
...
}
private void loadStoredFields(SearchContext searchContext, AtomicReaderContext readerContext, FieldsVisitor fieldVisitor, int docId) {
fieldVisitor.reset();
try {
readerContext.reader().document(docId, fieldVisitor);
} catch (IOException e) {
throw new FetchPhaseExecutionException(searchContext, "Failed to fetch doc id [" + docId + "]", e);
}
}
}
public final class SegmentReader extends AtomicReader implements Accountable {
@Override
public void document(int docID, StoredFieldVisitor visitor) throws IOException {
checkBounds(docID);
getFieldsReader().visitDocument(docID, visitor);
}
}
读取的字段信息,是从lucene中解压获取获取的,这里仅能获取store为true的field数据
public final class CompressingStoredFieldsReader extends StoredFieldsReader {
@Override
public void visitDocument(int docID, StoredFieldVisitor visitor)
throws IOException {
fieldsStream.seek(indexReader.getStartPointer(docID));
final int docBase = fieldsStream.readVInt();
final int chunkDocs = fieldsStream.readVInt();
// 具体代码略...
}
}
6. 合并结果
在成功获取到fetch数据后,将调用listener.onResult() 进行设置fetch result及调用finishHim()完成数据合并并返回响应数据,具体逻辑如下:
(1)elasticsearch调用SearchPhaseController的merge()方法进行数据合并
(2)如果请求中包含scroll,则需要重写生成scroll id,用于后面继续scroll
(3)调用listener.onResponse()向调用放返回响应数据
(4)如果当前不是scroll请求,则需要释放search context
private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
}
@Override
public void onFailure(Throwable t) {
...
}
});
}
合并数据的逻辑主要如下:
(1)获取文档的最大得分maxScore和命中总的文档数totalHits
(2)按照fetch开始时已经排好序的sortedDocs顺序,填充shard、score和fetch结果SearchHit
(3)merge其他信息,如suggest、facets、aggregation等等
(4)返回merge后的结果集
public InternalSearchResponse merge(ScoreDoc[] sortedDocs,
AtomicArray<? extends QuerySearchResultProvider> queryResultsArr,
AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr) {
long totalHits = 0;
float maxScore = Float.NEGATIVE_INFINITY;
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults) {
QuerySearchResult result = entry.value.queryResult();
totalHits += result.topDocs().totalHits;
if (!Float.isNaN(result.topDocs().getMaxScore())) {
maxScore = Math.max(maxScore, result.topDocs().getMaxScore());
}
}
if (Float.isInfinite(maxScore)) {
maxScore = Float.NaN;
}
for (AtomicArray.Entry<? extends FetchSearchResultProvider> entry : fetchResults) {
entry.value.fetchResult().initCounter();
}
List<InternalSearchHit> hits = new ArrayList<>();
if (!fetchResults.isEmpty()) {
for (ScoreDoc shardDoc : sortedDocs) {
FetchSearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex);
if (fetchResultProvider == null) {
continue;
}
FetchSearchResult fetchResult = fetchResultProvider.fetchResult();
int index = fetchResult.counterGetAndIncrement();
if (index < fetchResult.hits().internalHits().length) {
InternalSearchHit searchHit = fetchResult.hits().internalHits()[index];
searchHit.score(shardDoc.score);
searchHit.shard(fetchResult.shardTarget());
if (sorted) {
FieldDoc fieldDoc = (FieldDoc) shardDoc;
searchHit.sortValues(fieldDoc.fields);
if (sortScoreIndex != -1) {
searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue());
}
}
hits.add(searchHit);
}
}
}
InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore);
return new InternalSearchResponse(searchHits, facets, aggregations, suggest, timedOut, terminatedEarly);
}
至此,完成了elasticsearch搜索的全过程