ES的high level查询超时设置失效问题

2020-01-19  本文已影响0人  梦想又照进现实

背景

最近在升级es查询的客户端版本时候,使用6.3.2版本时候超时设置失效,http、scoket设置叶不起作用,查询SearchSourceBuilder中设置timeout也没有起作用,为查到根本原因进行排查;

使用问题

官方API
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.3/java-rest-high-search.html

版本:

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.3.2</version>
</dependency>

问题官方解析:

SetTimeout方法是不可靠的,解释如下:
Sadly, it is a best effort timeout, its not being checked on all places. Specifically, if you send a query that ends up being rewritten into many terms (fuzzy, or wildcard), that part (the rewrite part) does not check for a timeout.

遗憾的是,这是一个最好的超时,它不是在所有地方都被检查。具体地说,如果您发送的查询最终被重写为许多术语(模糊或通配符),那么该部分(重写部分)不会检查超时。

其他解决参考:
网上其他遇到类似问题:
1、
https://discuss.elastic.co/t/settimeout-of-searchrequestbuilder-not-working/11611/2

2、京东案例
https://www.liangzl.com/get-article-detail-136496.html

问题及解决描述:
在es官方的issue中有具体说到:

Sadly, it is a best effort timeout, its not being checked on all places. Specifically, if you send a query that ends up being rewritten into many terms (fuzzy, or wildcard), that part (the rewrite part) does not check for a timeout.

传送门:Timeout on search not respected

Transport api查询的最后一步也就说 actionGet()或者get()中设置timeout超时时间 actionGet(timeout) T actionGet(long var1, TimeUnit var3) throws ElasticsearchException; ,这样设置之后如果在设定的时间没有查询到数据,就会抛出timeout的异常,实际上这个超时并不是连接超时,而是处理超时,它的超时逻辑是java异步future的超时。不过这也已经满足了我们的需求。在设定时间内没有处理完毕,会抛出超时的异常。

 FilteredQueryBuilder fqb = QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), boolFilterBuilder);
            SearchResponse searchResponse = client.prepareSearch(indexName)
                    .setTimeout(TimeValue.timeValueMillis(500))
                    .setTypes(documentType)
                    .setSearchType(SearchType.QUERY_THEN_FETCH)
                    .setQuery(fqb)
                    .addSort("created", SortOrder.DESC)
                    .setFrom(0).setSize(30)
                    .execute()
                    .actionGet(1000);

RestHighLevelClient查询源码

跟踪restHighLevelClient.search(searchRequest)的源码发现最终调用的异步超时监听参数只有这个maxRetryTimeoutMillis,源码如下:

public Response performRequest(String method, String endpoint, Map<String, String> params,
                                   HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
                                   Header... headers) throws IOException {
        SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
        performRequestAsyncNoCatch(method, endpoint, params, entity, httpAsyncResponseConsumerFactory,
            listener, headers);
        return listener.get();
 }

设置这个超时会打印io的exception,但不会抛出IOExcption,打印信息示例:

java.io.IOException: listener timeout after waiting for [200] ms

解决方法

参考以上的transport api的原理,我们对high level rest client的查询api调用进行超时控制,大致做法:
1、添加Spring的异步调用
2、Java Future进行定时判断调用结果,进行超时后的处理
AsyncResult代码如下:

@Async("taskExecutor")
    public Future<List<ProcessMessage>> searchProcessMessageListBySizeAsyn(String[] indexName, String[] typeName, Integer pageSize, BoolQueryBuilder boolQuery){
        Assert.notNull(indexName,"the indexName string[] can't null");
        Assert.notNull(typeName,"the typeName string[] can't null");
        List<ProcessMessage> resultList =  this.searchProcessMessageListBySize(indexName, typeName, pageSize, boolQuery);
        return new AsyncResult<>(resultList);
}

FutureResult获取数据代码:

try {
            list = futureResult.get(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            log.info("ES查询超出时间设置:{}ms,进行空返回。", timeout);
            list = new ArrayList<>();
        }

运行效果:
对整个查询超时时候会进入TimeoutException后进行相关打印

ElasticSearchAPITest       : ES查询超出时间设置:1000ms,进行空返回。                         
ElasticSearchAPITest       : get:0                                                            
ElasticSearchAPITest       : 耗时:1015  

ES超时设置

1、Transport api设置
client连接集群节点超时(client.transport.ping_timeout)
Settings settings = Settings.builder().put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);
client.transport.ping_timeout ,The time to wait for a ping response from a node. Defaults to 5s. 默认5s,client ping命令响应的时间,如果无返回,则认为此节点不可用。如果客户端和集群间网络延迟较大或者连接不稳定,可能需要调大这个值。

scroll中的超时
SearchResponse scrollResp = client.prepareSearch(test)
.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
.setScroll(new TimeValue(60000))
.setQuery(qb)
.setSize(100).get();
scroll里面的时间,这个将启用超时的scroll滚动,经过测试,这个参数应该又是一个薛定谔的参数,没什么作用,还是少依赖它做一些事情吧

2、RestHighLevel设置
RestClient设置最大重试超时时间:
RestClient.builder(hostList.toArray(new HttpHost[0])).setMaxRetryTimeoutMillis(1000);
这样设置在api操作时候会进行超时的校验并打印IOException的异常信息,主线程并不会报错,源码解析是:

//providing timeout is just a safety measure to prevent everlasting waits
//the different client timeouts should already do their jobs
上一篇下一篇

猜你喜欢

热点阅读