Elasticsearch源码分析-Suggest分析
0. 前言
对于大部分搜索系统而言,都会有搜索提示功能,即输入一个词,会提示拥有该词/前缀词的信息,而elasticsearch的suggest能较好地支持该功能
搜索提示suggest请求对应的url pattern为 /_suggest 和 /{index}/_suggest,支持Get和Post 请求
curl -X POST 'localhost:9200/test_index/_suggest?pretty' -d '{
"myCompletion" : {
"text" : "10",
"completion" : {
"field" : "suggest term"
}
}
}'
1 请求Handler
suggest url对应的handler为RestSuggestAction,支持的参数如下:
routing
: 路由信息
preference
: 偏好信息
在解析完参数,会执行client.suggest()进行搜索提示,传入的action参数为SuggestAction.INSTANCE,它绑定的action为TransportSuggestAction类
public class ActionModule extends AbstractModule {
@Override
protected void configure() {
registerAction(SuggestAction.INSTANCE, TransportSuggestAction.class);
}
}
因此,在执行TransportAction.execute() 时,会执行TransportSuggestAction父类TransportBroadcastOperationAction.doExecute()方法,即为处理请求的Action入口
TransportSuggestAction类图2 处理Action
TransportBroadcastOperationAction的doExecute()方法中,主要是调用AsyncBroadcastAction的start()方法执行suggest逻辑
public abstract class TransportBroadcastOperationAction<Request extends BroadcastOperationRequest,
Response extends BroadcastOperationResponse,
ShardRequest extends BroadcastShardOperationRequest,
ShardResponse extends BroadcastShardOperationResponse>
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
new AsyncBroadcastAction(request, listener).start();
}
}
suggest 主要流程如下:
(1)根据请求的索引获取所有对应的shards信息
protected class AsyncBroadcastAction {
private final GroupShardsIterator shardsIts;
private final int expectedOps;
protected AsyncBroadcastAction(Request request, ActionListener<Response> listener) {
// ...
shardsIts = shards(clusterState, request, concreteIndices);
expectedOps = shardsIts.size();
}
}
获取suggest索引对应shards的方式与搜索时获取索引对应shards方式一致,主要就是
① 先获取索引对应的路由信息,包括匹配相应的通配符
② 根据路由信息和偏好信息获取对应的shards
public class TransportSuggestAction extends
TransportBroadcastOperationAction<SuggestRequest, SuggestResponse, ShardSuggestRequest, ShardSuggestResponse> {
@Override
protected GroupShardsIterator shards(ClusterState clusterState, SuggestRequest request, String[] concreteIndices) {
// 获取请求索引对应的routing 信息
Map<String, Set<String>> routingMap = clusterState.metaData()
.resolveSearchRouting(request.routing(), request.indices());
// 根据routing 和 preference 偏好获取对应的shards 信息
return clusterService.operationRouting()
.searchShards(clusterState, request.indices(), concreteIndices, routingMap, request.preference());
}
}
(2)遍历每一个shard执行performOperation()方法
protected class AsyncBroadcastAction {
private final GroupShardsIterator shardsIts;
private final int expectedOps;
public void start() {
if (shardsIts.size() == 0) {
// no shards
try {
listener.onResponse(newResponse(request, new AtomicReferenceArray(0), clusterState));
} catch (Throwable e) {
listener.onFailure(e);
}
return;
}
// count the local operations, and perform the non local ones
int shardIndex = -1;
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
performOperation(shardIt, shard, shardIndex);
} else {
// really, no shards active in this group
onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
}
}
}
}
(3)对每个shard执行TransportSuggestAction的shardOperation()方法查询索引
①如果shard所在节点为当前节点,则使用线程池执行onOperation()方法
②如果shard所在节点不是当前节点,则使用tcp方式将请求发送到对应节点上,action name为SuggestAction.NAME + [s],即indices:data/read/suggest[s],发送到的handler为ShardTransportHandler
protected class AsyncBroadcastAction {
protected TransportBroadcastOperationAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(settings, actionName, threadPool, actionFilters);
this.transportShardAction = actionName + "[s]";
transportService.registerHandler(transportShardAction, new ShardTransportHandler());
}
protected void performOperation(final ShardIterator shardIt, final ShardRouting shard, final int shardIndex) {
if (shard == null) {
// no more active shards... (we should not really get here, just safety)
onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
try {
final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);
if (shard.currentNodeId().equals(nodes.localNodeId())) {
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
onOperation(shard, shardIndex, shardOperation(shardRequest));
} catch (Throwable e) {
onOperation(shard, shardIt, shardIndex, e);
}
}
});
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) {
// no node connected, act as failure
onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler<ShardResponse>() {
// ...
@Override
public void handleResponse(ShardResponse response) {
onOperation(shard, shardIndex, response);
}
});
}
}
} catch (Throwable e) {
onOperation(shard, shardIt, shardIndex, e);
}
}
}
}
在shard所在节点接到请求后,会同样执行shardOperation()方法
class ShardTransportHandler extends BaseTransportRequestHandler<ShardRequest> {
@Override
public ShardRequest newInstance() {
return newShardRequest();
}
@Override
public String executor() {
return executor;
}
@Override
public void messageReceived(final ShardRequest request, final TransportChannel channel) throws Exception {
channel.sendResponse(shardOperation(request));
}
}
(4)如果所有shard均已执行完毕,则调用finishHim()合并shard结果并返回响应
当每请求一个shard后,counterOps会加一;当counterOps大小和expectedOps(请求的索引对应所有的shards个数)一致时,执行finishHim()方法
在finishHim()方法中,主要是调用newResponse()来构造响应结果
protected class AsyncBroadcastAction {
protected void onOperation(ShardRouting shard, int shardIndex, ShardResponse response) {
logger.trace("received response for {}", shard);
shardsResponses.set(shardIndex, response);
if (expectedOps == counterOps.incrementAndGet()) {
finishHim();
}
}
protected void finishHim() {
try {
listener.onResponse(newResponse(request, shardsResponses, clusterState));
} catch (Throwable e) {
listener.onFailure(e);
}
}
}
遍历每一个shard的响应结果,合并后返回SuggestResponse对象
public class TransportSuggestAction extends
TransportBroadcastOperationAction<SuggestRequest, SuggestResponse, ShardSuggestRequest, ShardSuggestResponse> {
@Override
protected SuggestResponse newResponse(SuggestRequest request,
AtomicReferenceArray shardsResponses,
ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
final Map<String, List<Suggest.Suggestion>> groupedSuggestions = new HashMap<>();
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// simply ignore non active shards
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = newArrayList();
}
shardFailures.add(new DefaultShardOperationFailedException(
(BroadcastShardOperationFailedException) shardResponse)
);
} else {
Suggest suggest = ((ShardSuggestResponse) shardResponse).getSuggest();
Suggest.group(groupedSuggestions, suggest);
successfulShards++;
}
}
return new SuggestResponse(
new Suggest(Suggest.reduce(groupedSuggestions)),
shardsResponses.length(),
successfulShards,
failedShards,
shardFailures
);
}
}
3 suggest逻辑实现
在shardOperation()方法中,主要
① 获取IndexService和IndexShard对象
② 并解析suggest source
③ 创建suggest context
④ 调用SuggestPhase.execute()方法进入suggest阶段
public class TransportSuggestAction extends
TransportBroadcastOperationAction<SuggestRequest, SuggestResponse, ShardSuggestRequest, ShardSuggestResponse> {
@Override
protected ShardSuggestResponse shardOperation(ShardSuggestRequest request) throws ElasticsearchException {
// 获取index service
// ...
try (Engine.Searcher searcher = indexShard.acquireSearcher("suggest")) {
BytesReference suggest = request.suggest();
if (suggest != null && suggest.length() > 0) {
parser = XContentFactory.xContent(suggest).createParser(suggest);
// 创建suggest context
final SuggestionSearchContext context = suggestPhase.parseElement().parseInternal(
parser,
indexService.mapperService(),
indexService.queryParserService(),
request.shardId().getIndex(),
request.shardId().id()
);
final Suggest result = suggestPhase.execute(context, searcher.searcher());
return new ShardSuggestResponse(request.shardId(), result);
}
return new ShardSuggestResponse(request.shardId(), new Suggest());
} catch (Throwable ex) {
} finally {
// ...
}
}
}
在suggest阶段,主要是遍历每一个suggestion,调用Suggester.execute()执行suggest
public class SuggestPhase extends AbstractComponent implements SearchPhase {
public Suggest execute(SuggestionSearchContext suggest, IndexSearcher searcher) {
try {
CharsRefBuilder spare = new CharsRefBuilder();
final List<Suggestion<? extends Entry<? extends Option>>> suggestions =
new ArrayList<>(suggest.suggestions().size());
for (Map.Entry<String, SuggestionSearchContext.SuggestionContext> entry : suggest.suggestions().entrySet()) {
SuggestionSearchContext.SuggestionContext suggestion = entry.getValue();
Suggester<SuggestionContext> suggester = suggestion.getSuggester();
Suggestion<? extends Entry<? extends Option>> result = suggester
.execute(entry.getKey(), suggestion, searcher, spare);
if (result != null) {
assert entry.getKey().equals(result.name);
suggestions.add(result);
}
}
return new Suggest(Suggest.Fields.SUGGEST, suggestions);
} catch (IOException e) {
throw new ElasticsearchException("I/O exception during suggest phase", e);
}
}
}
Suggester的execute()方法,主要是将判断searcher对应reader的文档数是否为0,如果不为0,则进入innerExecute,交给Suggester的子类执行具体的逻辑
public abstract class Suggester<T extends SuggestionSearchContext.SuggestionContext> {
public Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>>
execute(String name, T suggestion, IndexSearcher searcher, CharsRefBuilder spare) throws IOException {
// #3469 We want to ignore empty shards
if (searcher.getIndexReader().numDocs() == 0) {
return null;
}
return innerExecute(name, suggestion, searcher, spare);
}
}
Suggester下面有三个子类:CompletionSuggester、PhraseSuggester和TermSuggester,分别对应不同类型的suggest
Suggester类图下面即为completion suggest的具体逻辑,只要是利用lucene的lookup方法查询前缀索引,最后调用CollectionUtil.introSort()进行内省排序
public class CompletionSuggester extends Suggester<CompletionSuggestionContext> {
@Override
protected Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>> innerExecute(String name,
CompletionSuggestionContext suggestionContext, IndexSearcher searcher, CharsRefBuilder spare) throws IOException {
if (suggestionContext.mapper() == null || !(suggestionContext.mapper() instanceof CompletionFieldMapper)) {
throw new ElasticsearchException("Field [" + suggestionContext.getField() + "] is not a completion suggest field");
}
final IndexReader indexReader = searcher.getIndexReader();
CompletionSuggestion completionSuggestion = new CompletionSuggestion(name, suggestionContext.getSize());
spare.copyUTF8Bytes(suggestionContext.getText());
CompletionSuggestion.Entry completionSuggestEntry = new CompletionSuggestion.Entry(new StringText(spare.toString()), 0, spare.length());
completionSuggestion.addTerm(completionSuggestEntry);
String fieldName = suggestionContext.getField();
Map<String, CompletionSuggestion.Entry.Option> results = Maps.newHashMapWithExpectedSize(indexReader.leaves().size() * suggestionContext.getSize());
for (AtomicReaderContext atomicReaderContext : indexReader.leaves()) {
AtomicReader atomicReader = atomicReaderContext.reader();
Terms terms = atomicReader.fields().terms(fieldName);
if (terms instanceof Completion090PostingsFormat.CompletionTerms) {
final Completion090PostingsFormat.CompletionTerms lookupTerms = (Completion090PostingsFormat.CompletionTerms) terms;
final Lookup lookup = lookupTerms.getLookup(suggestionContext.mapper(), suggestionContext);
if (lookup == null) {
// we don't have a lookup for this segment.. this might be possible if a merge dropped all
// docs from the segment that had a value in this segment.
continue;
}
List<Lookup.LookupResult> lookupResults = lookup.lookup(spare.get(), false, suggestionContext.getSize());
for (Lookup.LookupResult res : lookupResults) {
final String key = res.key.toString();
final float score = res.value;
final Option value = results.get(key);
if (value == null) {
final Option option = new CompletionSuggestion.Entry.Option(new StringText(key), score, res.payload == null ? null
: new BytesArray(res.payload));
results.put(key, option);
} else if (value.getScore() < score) {
value.setScore(score);
value.setPayload(res.payload == null ? null : new BytesArray(res.payload));
}
}
}
}
final List<CompletionSuggestion.Entry.Option> options = new ArrayList<>(results.values());
CollectionUtil.introSort(options, scoreComparator);
int optionCount = Math.min(suggestionContext.getSize(), options.size());
for (int i = 0 ; i < optionCount ; i++) {
completionSuggestEntry.addOption(options.get(i));
}
return completionSuggestion;
}
}
4 FST 结构
对于Completion类型的suggest来说,使用的是FST(Finite State Transducers)存储结构,即有穷状态转换器,类似于前缀树或者字典树
我们有如下几个词:mop、moth、pop、star、stop和top 5个词,那么构建的FST如下图:
FST文件存储