Elasticsearch源码分析-搜索分析(二)
0. 前言
在上一篇文章中主要讲述了elasticsearch搜索过程的第一部分,主要逻辑包括:
1.接收search请求,然后对请求进行转发
2.匹配相应的action,对请求参数进行解析
3.根据search_type获取对应的TransportSearchTypeAction,执行具体的搜索逻辑
本文将接着上篇文章,以QUERY_THEN_FETCH为例,讲解搜索的QUERY阶段具体逻辑
1. 搜索参数初始化
由上篇文章可知,如果search_type为空(默认)或者为"query_then_fetch",那么elasticsearch会调用TransportSearchTypeAction的子类TransportSearchQueryThenFetch的execute()方法,以获取搜索结果,类图如下:
从类图中可以看出,方法的执行顺序为:
TransportSearchQueryThenFetchSeg.png
因此elasticsearch的搜索过程,就是调用TransportSearchQueryThenFetchAction的doExecute()方法,具体就是构造AsyncAction对象,然后执行其start()方法
public class TransportSearchQueryThenFetchAction extends TransportSearchTypeAction {
@Override
protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
new AsyncAction(searchRequest, listener).start();
}
private class AsyncAction extends BaseAsyncAction<QuerySearchResultProvider> {
final AtomicArray<FetchSearchResult> fetchResults;
final AtomicArray<IntArrayList> docIdsToLoad;
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
super(request, listener);
fetchResults = new AtomicArray<>(firstResults.length());
docIdsToLoad = new AtomicArray<>(firstResults.length());
}
}
}
AsyncAction和BaseAsyncAction以及TransportSearchQueryThenFetchAction类图如下:
从上图和代码我们可以看到,执行AsyncAction的start()方法其实是执行BaseAsyncAction的start()方法,在构造AsyncAction时
(1)调用父类BaseAsyncAction的构造方法
(2)初始化两个数组,即fetchResults和docIdsToLoad
fetchResults:
用来存储Fetch完成后shardIndex和对应的FetchResult
docIdsToLoad:
用来存储Query完成后shardIndex和待Fetch的文档id集合DocId
BaseAsyncAction的构造方法代码如下:
protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
protected BaseAsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
...
String[] concreteIndices = clusterState.metaData().concreteIndices(request.indicesOptions(), request.indices());
Map<String, Set<String>> routingMap = clusterState.metaData().resolveSearchRouting(request.routing(), request.indices());
shardsIts = clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, routingMap, request.preference());
expectedSuccessfulOps = shardsIts.size();
expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
firstResults = new AtomicArray<>(shardsIts.size());
...
}
}
在父类BaseAsyncAction的构造方法中,主要初始化了如下数据:
clusterState: 集群状态信息
nodes: 集群节点信息
concreteIndices: request中的索引转化成实际要搜索的索引
shardsIts: 需要搜索的index的shard,包括一些shard偏好
expectedSuccessfulOps: 所需执行的shard总数
expectedTotalOps: 期望多少个shard给出响应
firstResults: 用来存储每个shard的Query结果集
搜索请求使用preference参数支持搜索偏好,具体代码如下:
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId, DiscoveryNodes nodes, @Nullable String preference) {
preferenceType = Preference.parse(preference);
switch (preferenceType) {
case PREFER_NODE:
return indexShard.preferNodeActiveInitializingShardsIt(preference.substring(Preference.PREFER_NODE.type().length() + 1));
case LOCAL:
return indexShard.preferNodeActiveInitializingShardsIt(localNodeId);
case PRIMARY:
return indexShard.primaryActiveInitializingShardIt();
case PRIMARY_FIRST:
return indexShard.primaryFirstActiveInitializingShardsIt();
case ONLY_LOCAL:
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
case ONLY_NODE:
String nodeId = preference.substring(Preference.ONLY_NODE.type().length() + 1);
ensureNodeIdExists(nodes, nodeId);
return indexShard.onlyNodeActiveInitializingShardsIt(nodeId);
default:
throw new ElasticsearchIllegalArgumentException("unknown preference [" + preferenceType + "]");
}
}
preference类型具体的含义如下:
_prefer_node: 优选使用提供的节点标识
_local: 查询将优先在本地分配的分片上执行
_primary: 查询将在主分片上执行, 如果不可用,将在其他分片上执行
_primary_first: 该查询将仅在主分片上执行
_only_local: 查询将仅在本地分配的分片上执行
_only_node: 将查询限制为仅在具有提供的节点标识的节点上执行
在调用AsyncAction的构造方法完成参数初始化之后,就会调用BaseAsyncAction的start()开始搜索
2. 搜索QUERY整体流程
start()的逻辑主要是遍历每一个要搜索的shard,对每个shard都去调用performFirstPhase()方法获取搜索结果
protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
public void start() {
...
int shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
performFirstPhase(shardIndex, shardIt, shard);
}
...
}
}
在performFirstPhase()中主要是获取要搜索shard所在的节点,然后在指定的节点上执行搜索,拿到搜索结果后调用onFirstPhaseResult()方法获取待fetch的doc id集合,然后判断是否可以进入第二阶段执行Fetch
protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) {
if (shard == null) {
...
} else {
final DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) {
...
} else {
String[] filteringAliases = clusterState.metaData().filteringAliases(shard.index(), request.indices());
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime(), useSlowScroll), new SearchServiceListener<FirstResult>() {
@Override
public void onResult(FirstResult result) {
onFirstPhaseResult(shardIndex, shard, result, shardIt);
}
@Override
public void onFailure(Throwable t) {
onFirstPhaseResult(shardIndex, shard, node.id(), shardIt, t);
}
});
}
}
}
在onFirstPhaseResult()中主要是通过processFirstPhaseResult()方法,将第一阶段的Query结果放入firstResults对象中,然后判断已经获得结果的shard数是否和期望得到响应的shard数一致,如果一致,则进入第二阶段执行Fetch获取文档详情
protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
void onFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result, ShardIterator shardIt) {
result.shardTarget(new SearchShardTarget(shard.currentNodeId(), shard.index(), shard.id()));
processFirstPhaseResult(shardIndex, shard, result);
successfulOps.incrementAndGet();
final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1);
if (xTotalOps == expectedTotalOps) {
try {
innerMoveToSecondPhase();
} catch (Throwable e) {
if (logger.isDebugEnabled()) {
logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "] while moving to second phase", e);
}
raiseEarlyFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e, buildShardFailures()));
}
}
...
}
}
3.在shard上进行搜索
通过上一部分,我们得知elasticsearch会获取要搜索的shard所在的节点,然后通过sendExecuteFirstPhase()方法将search request发送到该节点执行Query,以获取相应的search结果,且sendExecuteFirstPhase会被不同的search_type重写。
1.QUERY_THEN_FETCH的sendExecuteFirstPhase会继续调用SearchServiceTransportAction.sendExecuteQuery继续发送search request。
private class AsyncAction extends BaseAsyncAction<QuerySearchResultProvider> {
@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<QuerySearchResultProvider> listener) {
searchService.sendExecuteQuery(node, request, listener);
}
}
2.在SearchServiceTransportAction的sendExecuteQuery()方法中,会首先判断要请求的节点是否是当前节点:
2.1 如果是当前节点,则直接执行execute()方法。在execute()方法中,线程池首先会执行入参传的callable的call()方法,根据执行的结果,如果为null则执行listener的onFailure()方法,否则执行onResult()方法
在callable的call()方法中,直接执行SearchService的executeQueryPhase()方法。
public class SearchServiceTransportAction extends AbstractComponent {
public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, final SearchServiceListener<QuerySearchResultProvider> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
execute(new Callable<QuerySearchResultProvider>() {
@Override
public QuerySearchResultProvider call() throws Exception {
return searchService.executeQueryPhase(request);
}
}, listener);
} else {
...
}
}
private <T> void execute(final Callable<? extends T> callable, final SearchServiceListener<T> listener) {
try {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
T result = null;
Throwable error = null;
try {
result = callable.call();
} catch (Throwable t) {
error = t;
} finally {
if (result == null) {
assert error != null;
listener.onFailure(error);
} else {
assert error == null : error;
listener.onResult(result);
}
}
}
});
} catch (Throwable t) {
listener.onFailure(t);
}
}
}
2.2 如果要search的不是当前node,则需要将request发送到指定的node上,该方法带了两个比较重要的入参QUERY_ACTION_NAME和BaseTransportResponseHandler对象
transportService.sendRequest(node, QUERY_ACTION_NAME, request, new BaseTransportResponseHandler<QuerySearchResultProvider>() {
@Override
public QuerySearchResult newInstance() {
return new QuerySearchResult();
}
@Override
public void handleResponse(QuerySearchResultProvider response) {
listener.onResult(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
在elasticsearch启动时,会注入SearchServiceTransportAction,会将QUERY_ACTION_NAME注册到SearchQueryTransportHandler上
public class SearchServiceTransportAction extends AbstractComponent {
@Inject
public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, SearchService searchService) {
super(settings);
transportService.registerHandler(QUERY_ACTION_NAME, new SearchQueryTransportHandler());
...
}
}
在SearchQueryTransportHandler接收到消息之后,也会执行SearchService的executeQueryPhase()方法获取search结果,这种方式实现了本地和远程搜索
private class SearchQueryTransportHandler extends BaseTransportRequestHandler<ShardSearchTransportRequest> {
@Override
public ShardSearchTransportRequest newInstance() {
return new ShardSearchTransportRequest();
}
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception {
QuerySearchResultProvider result = searchService.executeQueryPhase(request);
channel.sendResponse(result);
}
@Override
public String executor() {
return ThreadPool.Names.SEARCH;
}
}
4.搜索Query逻辑
在SearchService的executeQueryPhase()中,主要步骤是:
(1) 创建search context
(2) 预处理query
(3) 加载缓存或者查询lucene
(4) 如果search_type是count,则释放search context
(5) 记录慢query日志
public class SearchService extends AbstractLifecycleComponent<SearchService> {
public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {
final SearchContext context = createAndPutContext(request);
try {
context.indexShard().searchService().onPreQueryPhase(context);
long time = System.nanoTime();
contextProcessing(context);
loadOrExecuteQueryPhase(request, context, queryPhase);
if (context.searchType() == SearchType.COUNT) {
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
}
context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time);
return context.queryResult();
} catch (Throwable e) {
...
} finally {
cleanContext(context);
}
}
}
在调用createAndPutContext创建context时,主要操作如下:
(1) 根据index shard获取对应的lucene searcher,即lucene的搜索对象
(2) 设置scroll,解析source,设置from和size
(3) query预处理,将elasticsearch的query转化为lucene的query
(4) search context默认保存5分钟
在创建完context后,开始调用loadOrExecuteQueryPhase执行查询,如果search_type是COUNT且满足cache的条件,则从cache获取结果,否则调用QueryPhase的execute()方法执行lucene搜索
public class QueryPhase implements SearchPhase {
public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
boolean rescore = false;
try {
searchContext.queryResult().from(searchContext.from());
searchContext.queryResult().size(searchContext.size());
Query query = searchContext.query();
TopDocs topDocs;
int numDocs = searchContext.from() + searchContext.size();
if (searchContext.searchType() == SearchType.COUNT || numDocs == 0) {
TotalHitCountCollector collector = new TotalHitCountCollector();
searchContext.searcher().search(query, collector);
topDocs = new TopDocs(collector.getTotalHits(),
Lucene.EMPTY_SCORE_DOCS, 0);
} else if (searchContext.searchType() == SearchType.SCAN) {
topDocs = searchContext.scanContext().execute(searchContext);
} else {
if (!searchContext.useSlowScroll() && searchContext.request().scroll() != null) {
} else {
if (searchContext.sort() != null) {
topDocs = searchContext.searcher().search(
query,
null,
numDocs, searchContext.sort(),
searchContext.trackScores(),
searchContext.trackScores());
} else {
topDocs = searchContext.searcher().search(query, numDocs);
}
}
}
searchContext.queryResult().topDocs(topDocs);
} catch (Throwable e) {
throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e);
} finally {
searchContext.searcher().finishStage(ContextIndexSearcher.Stage.MAIN_QUERY);
}
if (rescore) { // only if we do a regular search
rescorePhase.execute(searchContext);
}
suggestPhase.execute(searchContext);
facetPhase.execute(searchContext);
aggregationPhase.execute(searchContext);
}
}
lucene search完成后将top docs放入search context中,至此已经完成了一个shard的搜索。如果已经搜索shard数和期望要执行的shard数相等,那么执行innerMoveToSecondPhase()方法进入第二阶段执行Fetch,具体逻辑将会在第三篇文章中介绍。