ESelasticsearch源码分析

Elasticsearch源码分析-搜索分析(二)

2018-12-06  本文已影响0人  尹亮_36cd

0. 前言

在上一篇文章中主要讲述了elasticsearch搜索过程的第一部分,主要逻辑包括:
1.接收search请求,然后对请求进行转发
2.匹配相应的action,对请求参数进行解析
3.根据search_type获取对应的TransportSearchTypeAction,执行具体的搜索逻辑

本文将接着上篇文章,以QUERY_THEN_FETCH为例,讲解搜索的QUERY阶段具体逻辑

1. 搜索参数初始化

上篇文章可知,如果search_type为空(默认)或者为"query_then_fetch",那么elasticsearch会调用TransportSearchTypeAction的子类TransportSearchQueryThenFetch的execute()方法,以获取搜索结果,类图如下:

TransportSearchQueryThenFetchClass.png

从类图中可以看出,方法的执行顺序为:


TransportSearchQueryThenFetchSeg.png

因此elasticsearch的搜索过程,就是调用TransportSearchQueryThenFetchAction的doExecute()方法,具体就是构造AsyncAction对象,然后执行其start()方法

public class TransportSearchQueryThenFetchAction extends TransportSearchTypeAction {
    @Override
    protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
        new AsyncAction(searchRequest, listener).start();
    }
    
    private class AsyncAction extends BaseAsyncAction<QuerySearchResultProvider> {
        final AtomicArray<FetchSearchResult> fetchResults;
        final AtomicArray<IntArrayList> docIdsToLoad;
        private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
            super(request, listener);
            fetchResults = new AtomicArray<>(firstResults.length());
            docIdsToLoad = new AtomicArray<>(firstResults.length());
        }
    }
}

AsyncAction和BaseAsyncAction以及TransportSearchQueryThenFetchAction类图如下:

TransportSearchQueryThenFetchClass_1.png
从上图和代码我们可以看到,执行AsyncAction的start()方法其实是执行BaseAsyncAction的start()方法,在构造AsyncAction时
(1)调用父类BaseAsyncAction的构造方法
(2)初始化两个数组,即fetchResults和docIdsToLoad
fetchResults: 用来存储Fetch完成后shardIndex和对应的FetchResult
docIdsToLoad: 用来存储Query完成后shardIndex和待Fetch的文档id集合DocId

BaseAsyncAction的构造方法代码如下:

protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
    protected BaseAsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
            ...
            String[] concreteIndices = clusterState.metaData().concreteIndices(request.indicesOptions(), request.indices());
            Map<String, Set<String>> routingMap = clusterState.metaData().resolveSearchRouting(request.routing(), request.indices());
            shardsIts = clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, routingMap, request.preference());
            expectedSuccessfulOps = shardsIts.size();
            expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
            firstResults = new AtomicArray<>(shardsIts.size());
            ...
    }
}

在父类BaseAsyncAction的构造方法中,主要初始化了如下数据:
clusterState: 集群状态信息
nodes: 集群节点信息
concreteIndices: request中的索引转化成实际要搜索的索引
shardsIts: 需要搜索的index的shard,包括一些shard偏好
expectedSuccessfulOps: 所需执行的shard总数
expectedTotalOps: 期望多少个shard给出响应
firstResults: 用来存储每个shard的Query结果集

搜索请求使用preference参数支持搜索偏好,具体代码如下:

private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId, DiscoveryNodes nodes, @Nullable String preference) {
    preferenceType = Preference.parse(preference);
    switch (preferenceType) {
        case PREFER_NODE:
            return indexShard.preferNodeActiveInitializingShardsIt(preference.substring(Preference.PREFER_NODE.type().length() + 1));
        case LOCAL:
            return indexShard.preferNodeActiveInitializingShardsIt(localNodeId);
        case PRIMARY:
            return indexShard.primaryActiveInitializingShardIt();
        case PRIMARY_FIRST:
            return indexShard.primaryFirstActiveInitializingShardsIt();
        case ONLY_LOCAL:
            return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
        case ONLY_NODE:
            String nodeId = preference.substring(Preference.ONLY_NODE.type().length() + 1);
            ensureNodeIdExists(nodes, nodeId);
            return indexShard.onlyNodeActiveInitializingShardsIt(nodeId);
        default:
            throw new ElasticsearchIllegalArgumentException("unknown preference [" + preferenceType + "]");
    }
}

preference类型具体的含义如下:
_prefer_node: 优选使用提供的节点标识
_local: 查询将优先在本地分配的分片上执行
_primary: 查询将在主分片上执行, 如果不可用,将在其他分片上执行
_primary_first: 该查询将仅在主分片上执行
_only_local: 查询将仅在本地分配的分片上执行
_only_node: 将查询限制为仅在具有提供的节点标识的节点上执行

在调用AsyncAction的构造方法完成参数初始化之后,就会调用BaseAsyncAction的start()开始搜索

2. 搜索QUERY整体流程

start()的逻辑主要是遍历每一个要搜索的shard,对每个shard都去调用performFirstPhase()方法获取搜索结果

protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
    public void start() {
        ...
        int shardIndex = -1;
        for (final ShardIterator shardIt : shardsIts) {
            shardIndex++;
            final ShardRouting shard = shardIt.nextOrNull();
            if (shard != null) {
                performFirstPhase(shardIndex, shardIt, shard);
            }
            ...
        }
    }

在performFirstPhase()中主要是获取要搜索shard所在的节点,然后在指定的节点上执行搜索,拿到搜索结果后调用onFirstPhaseResult()方法获取待fetch的doc id集合,然后判断是否可以进入第二阶段执行Fetch

protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
    void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
        if (shard == null) {
            ...
        } else {
            final DiscoveryNode node = nodes.get(shard.currentNodeId());
            if (node == null) {
                ...
            } else {
                String[] filteringAliases = clusterState.metaData().filteringAliases(shard.index(), request.indices());
                sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime(), useSlowScroll), new SearchServiceListener<FirstResult>() {
                    @Override
                    public void onResult(FirstResult result) {
                        onFirstPhaseResult(shardIndex, shard, result, shardIt);
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        onFirstPhaseResult(shardIndex, shard, node.id(), shardIt, t);
                    }
                });
            }
        }
    }

在onFirstPhaseResult()中主要是通过processFirstPhaseResult()方法,将第一阶段的Query结果放入firstResults对象中,然后判断已经获得结果的shard数是否和期望得到响应的shard数一致,如果一致,则进入第二阶段执行Fetch获取文档详情

protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
    void onFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result, ShardIterator shardIt) {
        result.shardTarget(new SearchShardTarget(shard.currentNodeId(), shard.index(), shard.id()));
        processFirstPhaseResult(shardIndex, shard, result);
        successfulOps.incrementAndGet();
        final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1);
        if (xTotalOps == expectedTotalOps) {
            try {
                innerMoveToSecondPhase();
            } catch (Throwable e) {
                if (logger.isDebugEnabled()) {
                    logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "] while moving to second phase", e);
                }
                raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
            }
        } 
        ...
    }
}

3.在shard上进行搜索

通过上一部分,我们得知elasticsearch会获取要搜索的shard所在的节点,然后通过sendExecuteFirstPhase()方法将search request发送到该节点执行Query,以获取相应的search结果,且sendExecuteFirstPhase会被不同的search_type重写。

1.QUERY_THEN_FETCH的sendExecuteFirstPhase会继续调用SearchServiceTransportAction.sendExecuteQuery继续发送search request。

private class AsyncAction extends BaseAsyncAction<QuerySearchResultProvider> {
    @Override
    protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<QuerySearchResultProvider> listener) {
        searchService.sendExecuteQuery(node, request, listener);
    }
}

2.在SearchServiceTransportAction的sendExecuteQuery()方法中,会首先判断要请求的节点是否是当前节点:
2.1 如果是当前节点,则直接执行execute()方法。在execute()方法中,线程池首先会执行入参传的callable的call()方法,根据执行的结果,如果为null则执行listener的onFailure()方法,否则执行onResult()方法
在callable的call()方法中,直接执行SearchService的executeQueryPhase()方法。

public class SearchServiceTransportAction extends AbstractComponent {
    public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, final SearchServiceListener<QuerySearchResultProvider> listener) {
        if (clusterService.state().nodes().localNodeId().equals(node.id())) {
            execute(new Callable<QuerySearchResultProvider>() {
                @Override
                public QuerySearchResultProvider call() throws Exception {
                    return searchService.executeQueryPhase(request);
                }
            }, listener);
        } else {
            ...
        }
    }

    private <T> void execute(final Callable<? extends T> callable, final SearchServiceListener<T> listener) {
        try {
            threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
                @Override
                public void run() {
                    T result = null;
                    Throwable error = null;
                    try {
                        result = callable.call();
                    } catch (Throwable t) {
                        error = t;
                    } finally {
                        if (result == null) {
                            assert error != null;
                            listener.onFailure(error);
                        } else {
                            assert error == null : error;
                            listener.onResult(result);
                        }
                    }
                }
            });
        } catch (Throwable t) {
            listener.onFailure(t);
        }
    }
}

2.2 如果要search的不是当前node,则需要将request发送到指定的node上,该方法带了两个比较重要的入参QUERY_ACTION_NAME和BaseTransportResponseHandler对象

transportService.sendRequest(node, QUERY_ACTION_NAME, request, new BaseTransportResponseHandler<QuerySearchResultProvider>() {
        @Override
        public QuerySearchResult newInstance() {
            return new QuerySearchResult();
        }
        @Override
        public void handleResponse(QuerySearchResultProvider response) {
            listener.onResult(response);
        }
        @Override
        public void handleException(TransportException exp) {
            listener.onFailure(exp);
        }
        @Override
        public String executor() {
            return ThreadPool.Names.SAME;
        }
    });

在elasticsearch启动时,会注入SearchServiceTransportAction,会将QUERY_ACTION_NAME注册到SearchQueryTransportHandler上

public class SearchServiceTransportAction extends AbstractComponent {
    @Inject
    public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, SearchService searchService) {
        super(settings);
        transportService.registerHandler(QUERY_ACTION_NAME, new SearchQueryTransportHandler());
        ...
    }
}

在SearchQueryTransportHandler接收到消息之后,也会执行SearchService的executeQueryPhase()方法获取search结果,这种方式实现了本地和远程搜索

private class SearchQueryTransportHandler extends BaseTransportRequestHandler<ShardSearchTransportRequest> {
    @Override
    public ShardSearchTransportRequest newInstance() {
        return new ShardSearchTransportRequest();
    }
    @Override
    public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception {
        QuerySearchResultProvider result = searchService.executeQueryPhase(request);
        channel.sendResponse(result);
    }

    @Override
    public String executor() {
        return ThreadPool.Names.SEARCH;
    }
}

4.搜索Query逻辑

在SearchService的executeQueryPhase()中,主要步骤是:
(1) 创建search context
(2) 预处理query
(3) 加载缓存或者查询lucene
(4) 如果search_type是count,则释放search context
(5) 记录慢query日志

public class SearchService extends AbstractLifecycleComponent<SearchService> {
    public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {
        final SearchContext context = createAndPutContext(request);
        try {
            context.indexShard().searchService().onPreQueryPhase(context);
            long time = System.nanoTime();
            contextProcessing(context);
            loadOrExecuteQueryPhase(request, context, queryPhase);
            if (context.searchType() == SearchType.COUNT) {
                freeContext(context.id());
            } else {
                contextProcessedSuccessfully(context);
            }
            context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time);

            return context.queryResult();
        } catch (Throwable e) {
            ...
        } finally {
            cleanContext(context);
        }
    }
}

在调用createAndPutContext创建context时,主要操作如下:
(1) 根据index shard获取对应的lucene searcher,即lucene的搜索对象
(2) 设置scroll,解析source,设置from和size
(3) query预处理,将elasticsearch的query转化为lucene的query
(4) search context默认保存5分钟

在创建完context后,开始调用loadOrExecuteQueryPhase执行查询,如果search_type是COUNT且满足cache的条件,则从cache获取结果,否则调用QueryPhase的execute()方法执行lucene搜索

public class QueryPhase implements SearchPhase {
    public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
        boolean rescore = false;
        try {
            searchContext.queryResult().from(searchContext.from());
            searchContext.queryResult().size(searchContext.size());

            Query query = searchContext.query();
            TopDocs topDocs;
            int numDocs = searchContext.from() + searchContext.size();
            if (searchContext.searchType() == SearchType.COUNT || numDocs == 0) {
                TotalHitCountCollector collector = new TotalHitCountCollector();
                searchContext.searcher().search(query, collector);
                topDocs = new TopDocs(collector.getTotalHits(),
                Lucene.EMPTY_SCORE_DOCS, 0);
            } else if (searchContext.searchType() == SearchType.SCAN) {
                topDocs = searchContext.scanContext().execute(searchContext);
            } else {
                if (!searchContext.useSlowScroll() && searchContext.request().scroll() != null) {
                    
                } else {
                    if (searchContext.sort() != null) {
                        topDocs = searchContext.searcher().search(
                                    query, 
                                    null,
                                    numDocs, searchContext.sort(),
                                    searchContext.trackScores(),
                                    searchContext.trackScores());
                    } else {
                        topDocs = searchContext.searcher().search(query, numDocs);
                    }
                }
            }
            searchContext.queryResult().topDocs(topDocs);
        } catch (Throwable e) {
            throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e);
        } finally {
                                    searchContext.searcher().finishStage(ContextIndexSearcher.Stage.MAIN_QUERY);
        }
        if (rescore) { // only if we do a regular search
            rescorePhase.execute(searchContext);
        }
        suggestPhase.execute(searchContext);
        facetPhase.execute(searchContext);
        aggregationPhase.execute(searchContext);
    }
}

lucene search完成后将top docs放入search context中,至此已经完成了一个shard的搜索。如果已经搜索shard数和期望要执行的shard数相等,那么执行innerMoveToSecondPhase()方法进入第二阶段执行Fetch,具体逻辑将会在第三篇文章中介绍。

上一篇 下一篇

猜你喜欢

热点阅读