Elasticsearch 5.x 源码分析(6)Request
本篇只关注一个细节,就是在一个端到端的场景,一个Request是如何构造并传输,从Client 到Node 的Action 接收端又如何被parse,接着传输到各个的 shards 去处理;反之一个Response 生成后如何一直传输回到 Node 端最后回到 Transport Client 或者 REST Client。本篇主要回答了这几天处理的一个问题:如果采用REST Client 来做 Query 查询的话,有没有一种办法可以把 JSON 的Response 解析成 SearchResponse 对象如 Transport Client 般地处理回复。
所以我们分两部分来讲:1. 看一下各种Request 和Response 的源码;2. 看一下如何去parse 一个 JSON 成一个SearchResponse 对象。
Transport Client 调用链
我们先从最简单的例子开始吧,由于我在项目开发中用 Template Search 比较多那么就看一个最简单的 Template Search 的例子
SearchResponse sr = new SearchTemplateRequestBuilder(client)
.setScript("template_gender")
.setScriptType(ScriptType.STORED)
.setScriptParams(template_params)
.setRequest(new SearchRequest())
.get()
.getResponse();
public class SearchTemplateRequestBuilder
extends ActionRequestBuilder<SearchTemplateRequest, SearchTemplateResponse, SearchTemplateRequestBuilder> {
SearchTemplateRequestBuilder(ElasticsearchClient client, SearchTemplateAction action) {
super(client, action, new SearchTemplateRequest());
}
public SearchTemplateRequestBuilder(ElasticsearchClient client) {
this(client, SearchTemplateAction.INSTANCE);
}
public SearchTemplateRequestBuilder setRequest(SearchRequest searchRequest) {
request.setRequest(searchRequest);
return this;
}
顾名思义,SearchTemplateRequestBuilder
就是负责生成一个SearchTemplateRequest
,我们看看它的继承关系
ActionRequest
可以理解成一个通用的可以被Action 层处理的Request 对象,SearchTemplateRequest
就是在外面包了一些自身的属性,如profile,script 等,这里我们不关心。打开ActionRequest
里面就更简单了,并没有任何逻辑,弄了一个方法public abstract ActionRequestValidationException validate();
给Action 层做一些字段校验而已。而ActionRequest
又继承自TransportRequest
乃至TransportMessage
那么不用看基本就能猜到ActionRequest
直接就用于节点间传输了,而协议就是走netty 的Transport 了。
还有一个地方就是,在SearchTemplateRequest
的构造函数里指定了一个SearchTemplateAction.INSTANCE
,这个变量指定了这个Request
请求期望的目的是这个 INSTANCE key的接受者。而这个接受者则在 MustachePlugin
定义了,这里有种感觉就是SearchTemplateAction 这个Action 层好像完全被架空了,仅用来路由请求而已,感觉怪怪的。
而TransportSearchTemplateAction
只做了一件事,就是从Request
里面抽取出script
变量,然后调用scriptService
去load 和compile 这个script 并渲染出最终的这个SearchRequest
对象。
static SearchRequest convert(SearchTemplateRequest searchTemplateRequest, SearchTemplateResponse response, ScriptService scriptService,
NamedXContentRegistry xContentRegistry) throws IOException {
Script script = new Script(searchTemplateRequest.getScriptType(), TEMPLATE_LANG, searchTemplateRequest.getScript(),
searchTemplateRequest.getScriptParams() == null ? Collections.emptyMap() : searchTemplateRequest.getScriptParams());
CompiledTemplate compiledScript = scriptService.compileTemplate(script, SEARCH);
BytesReference source = compiledScript.run(script.getParams());
response.setSource(source);
最后交给了TransportSearchAction
来把Request当做一个普通的Request来处理。
由于netty处理请求采用异步的方式,所以这里需要一个
onResponse()
的回调;SearchAction
把请求发送给SearchService
来执行,SearchService
更多的是去控制Lucene 的逻辑,所以从SearchAction
到SearchService
层之间需要一个转换,来把SearchRequest
转换成Elasticsearch 内部的一种特殊的Request 对象ShardsSearchRequest
,这个过程在TransportSearchAction
的doExecute
方法里做,在这里创建了一个SearchQueryThenFetchAsyncAction
来去控制整个发送和接收的控制,包括收到每个shards的Response 后如何做 reduce
和sort
等等,在SearchQueryThenFetchAsyncAction
你会看到一句getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()), buildShardSearchRequest(shardIt), getTask(), listener);
。至此验证了最开始的想法,就是从TransportClient 创建的
Request
,将会一走到处理这类请求的Action
层,最终会转换成ShardSearchRequest
发送给 Service 层执行 Lucene调用并返回。SearchService
相应的Lucene 操作,这里就不再进去看了,有兴趣的可以直接去看 SearchService
的public SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task)
方法;和QueryPhase
的public void execute(SearchContext searchContext) throws QueryPhaseExecutionException
方法。
那么SearchResponse 的话就直接反过来看就好了;从SearchAction
开始。。。。
REST Client 调用链
Transport Client 直接创建的是 SearchRequest
对象,并且直接发送到 TransportXXXAction 中去处理,而REST 则创建的是 JSON 格式的 Request,还是拿上面的 SearchTemplate 查询的例子来看,如果用REST Client构造一个SearchTemplate 的request 发送的话,那么接收的endpoint 就是RestSearchTemplateAction
,在这个类中我们看到这样一段。
@Override
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
// Creates the search request with all required params
SearchRequest searchRequest = new SearchRequest();
RestSearchAction.parseSearchRequest(searchRequest, request, null);
// Creates the search template request
SearchTemplateRequest searchTemplateRequest;
try (XContentParser parser = request.contentOrSourceParamParser()) {
searchTemplateRequest = PARSER.parse(parser, new SearchTemplateRequest(), null);
}
searchTemplateRequest.setRequest(searchRequest);
return channel -> client.execute(SearchTemplateAction.INSTANCE, searchTemplateRequest, new RestStatusToXContentListener<>(channel));
}
private static final ObjectParser<SearchTemplateRequest, Void> PARSER;
static {
PARSER = new ObjectParser<>("search_template");
PARSER.declareField((parser, request, s) ->
request.setScriptParams(parser.map())
, new ParseField("params"), ObjectParser.ValueType.OBJECT);
PARSER.declareString((request, s) -> {
request.setScriptType(ScriptType.FILE);
request.setScript(s);
}, new ParseField("file"));
PARSER.declareString((request, s) -> {
request.setScriptType(ScriptType.STORED);
request.setScript(s);
}, new ParseField("id"));
PARSER.declareBoolean(SearchTemplateRequest::setExplain, new ParseField("explain"));
PARSER.declareBoolean(SearchTemplateRequest::setProfile, new ParseField("profile"));
PARSER.declareField((parser, request, value) -> {
request.setScriptType(ScriptType.INLINE);
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
//convert the template to json which is the only supported XContentType (see CustomMustacheFactory#createEncoder)
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
request.setScript(builder.copyCurrentStructure(parser).string());
} catch (IOException e) {
throw new ParsingException(parser.getTokenLocation(), "Could not parse inline template", e);
}
} else {
request.setScript(parser.text());
}
}, new ParseField("inline", "template"), ObjectParser.ValueType.OBJECT_OR_STRING);
}
很容易发现,在RestAction接收到一个请求时,会通过RestSearchAction.parseSearchRequest(searchRequest, request, null);
来把一个RestRequest
转换成一个SearchRequest
,然后重新扔回去channel
中去,就好像一个TransportClient 一样,在最后的调用里会同传一个toXContentListener
里面有一个方法是SearchResponse
-> XContent 的转换,也就是toJson 同样的意义。
从 JSON Response 反解析成SearchResponse
很多时候,我们又想用REST Client的便利,但是我们又不想拿到一个 String 的Response 然后弄成一个JSON Object慢慢的去解析,我们希望还是直接处理一个SearchResponse 来的爽快。既然有了toXContent()
方法,那么有没有 fromXContent()
呢,答案是Elasticsearch 5.5 之前是没有的(或者仅限于SearchHits有)。。。
如果你去看ES 的5.5 branch或者master branch的 log,你会发现其中的一个重点commit有:
- 在5.3 时诸如SearchResponse,SearchHit,Aggregation,Suggest 等等类都是接口,内部有InternalXXXX这样的实现类,而5.5 则全部都变成了 final class
- 5.4,5.5 陆陆续续把SearchResponse、Aggregations等都加上了
fromXContent
方法,所以如果你直接在5.5 的代码下,你直接一句话就可以完成我想要解决的这个问题 - Aggregations 这个解析是比较特别的,和普通的结果集不同,aggs 的请求类型和结果类型都非常丰富,举个例子,一个termsAggregation 和一个topHitsAggregation 的结果就完全不一样,所以Aggregations 的
fromXContent()
所谓是最最最复杂的,从5.5的代码中就可以看到,ES为Aggregations 实现了一堆的parser
那么我要做的,目前就是把5.5的这部分代码暂时迁移过来5.3先用着了。。。
后话
有朋友支招说其实github上也有类似的项目了,如jest
可是它始终不是采用了原生的SearchResponse 对象,因此使用起来还是不是很带劲。
而Elasticsearch 自己为啥要这么大费周章话那么多人力物力重写那堆的SearchResponse、SearchHit、Aggregation 呢?答案就是ES也想出一个这样的REST Client,叫做 High Level Rest Client,目的就是直接在REST Client上操作SearchResponse 对象
scope
有兴趣的话可以上去GitHub 查看这个issue,期望的release 是ES5.6 和ES6.0
全篇完,有问题欢迎讨论。