深入浅出Elasticsearch

Elasticsearch RestHighLevelClien

2020-06-11  本文已影响0人  zhenxianyimeng

现在我们在用JAVA做ES应用开发的时候,通常会使用RestHighLevelClient来进行发送请求,早期没有RestHighLevelClient的时候,是直接使用Transport进行转发请求。

1.ES请求流转

首先我们来看下,从client发出http请求到ES集群后的整个流程。
1)首先请求到达集群节点后,由Netty4HttpServerTransport接受请求,通过RequestHandler类转到Controller,再有Controller根据http请求,找打注册在上面的Action。
2)根据Http请求选择的TransportXXXAction会判断当前请求的shard是否在当前节点,如果在,直接访问lucene,如果不在,则需要队请求转发
3)Node内部的请求转发都是基于Netty4Transpor的,默认是9200端口,可以理解为Elasticsearch内部的RPC通讯
4)请求到达node2之后,经过对应的XXXHandler处理后,会访问node2的lucene


image.png

2.RestHighLevelClient的请求流程

2.1新建client

HttpHost httpHost = new HttpHost("localhost", 9200, "http");
RestClientBuilder builder =  RestClient.builder(httpHost);
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
        new UsernamePasswordCredentials("elastic", "123456"));
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
    @Override
    public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
        builder.setConnectTimeout(3000);
        return builder;
    }
});
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
    @Override
    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
        httpAsyncClientBuilder.setMaxConnTotal(30);
        httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
        return httpAsyncClientBuilder;
    }
});
client = new RestHighLevelClient(builder);

如上代码所示,在新建client的时候,可以指定请求的节点,鉴权,请求超时,http线程池等参数。示例只是用了一个节点,如果才用多个节点的,在builder的时候可以传入多个HttpHost

2.2请求体构造

我们以Search请求为例,来进行举例,和search类似,client对很多方法都提供了同步和异步的方法


image.png

请求最主要的参数就是SearchRequest,这里需要放入当前请求的索引,查询条件等

SearchRequest最主要的两个参数,一个是indices,还有一个就是SearchSourceBuilder,indices表示请求的索引,SearchSourceBuilder表示请求的语法

用RestHighLevelClient的一个很重要的原因,就是它的语法很大程度上和ES的DSL是一一对应的,比如SearchSourceBuilder我们可以理解为DSL最外层的{}, SearchSourceBuilder内部的成员变量有,
QueryBuilder,fetchSourceContext, aggregations等,这些成员变量内部的接口也和DSL语法基本差不多。

在SearchRequest构造完成之后,我们可以调用toString()方法生成DSL,当然在使用过程中我们也可以在kibina等工具内用DSL调好请求,然后在Search的时候直接使用DSL,但是这种方式不利于维护查询的语法。不建议使用。

2.3请求发送

执行search方法后,最终会执行到RestClient:performRequest

public Response performRequest(Request request) throws IOException {
    SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
    performRequestAsyncNoCatch(request, listener);
    return listener.get();
}

通过该方法,我们看到无论我们在外面选择的是同步或者异步的方法,其实clinet内部都是按照异步处理的。所以2.1中介绍的线程池配置就很关键,需要根据不同的业务选择不同的线程池大小。

我们来看下这段代码的主要逻辑performRequestAsyncNoCatch(request, listener),该方法中经过一些参数校验和请求封装后,进入方法

performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
        request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);

这里有个关键的方法,nextNode(),我们看一下nextNode()做了些什么

/**
 * Returns a non-empty {@link Iterator} of nodes to be used for a request
 * that match the {@link NodeSelector}.
 * <p>
 * If there are no living nodes that match the {@link NodeSelector}
 * this will return the dead node that matches the {@link NodeSelector}
 * that is closest to being revived.
 * @throws IOException if no nodes are available
 */
private NodeTuple<Iterator<Node>> nextNode() throws IOException {
    NodeTuple<List<Node>> nodeTuple = this.nodeTuple;
    Iterable<Node> hosts = selectNodes(nodeTuple, blacklist, lastNodeIndex, nodeSelector);
    return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache);
}

我们先来看一下RestClient的几个成员变量,即selectNodes的几个入参

private final AtomicInteger lastNodeIndex = new AtomicInteger(0);  //上一次请求的node编号
private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>();  //node的状态map,表示某个node是否连不上
private final NodeSelector nodeSelector; //node 选择器,用于负载均衡
private volatile NodeTuple<List<Node>> nodeTuple; //当前client配置的所有协调节点信息

理解了这几个变量之后,我们再看selectNodes方法

/**
 * Select nodes to try and sorts them so that the first one will be tried initially, then the following ones
 * if the previous attempt failed and so on. Package private for testing.
 */
static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost, DeadHostState> blacklist,
                                  AtomicInteger lastNodeIndex, NodeSelector nodeSelector) throws IOException {
    /*
     * Sort the nodes into living and dead lists.
     */
    List<Node> livingNodes = new ArrayList<>(nodeTuple.nodes.size() - blacklist.size());
    List<DeadNode> deadNodes = new ArrayList<>(blacklist.size());
    for (Node node : nodeTuple.nodes) { //1
        DeadHostState deadness = blacklist.get(node.getHost());
        if (deadness == null) {
            livingNodes.add(node);
            continue;
        }
        if (deadness.shallBeRetried()) {
            livingNodes.add(node);
            continue;
        }
        deadNodes.add(new DeadNode(node, deadness));
    }

    if (false == livingNodes.isEmpty()) {//2
        /*
         * Normal state: there is at least one living node. If the
         * selector is ok with any over the living nodes then use them
         * for the request.
         */
        List<Node> selectedLivingNodes = new ArrayList<>(livingNodes);
        nodeSelector.select(selectedLivingNodes);
        if (false == selectedLivingNodes.isEmpty()) {
            /*
             * Rotate the list using a global counter as the distance so subsequent
             * requests will try the nodes in a different order.
             */
            Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement());
            return selectedLivingNodes;
        }
    }

    /*
     * Last resort: there are no good nodes to use, either because
     * the selector rejected all the living nodes or because there aren't
     * any living ones. Either way, we want to revive a single dead node
     * that the NodeSelectors are OK with. We do this by passing the dead
     * nodes through the NodeSelector so it can have its say in which nodes
     * are ok. If the selector is ok with any of the nodes then we will take
     * the one in the list that has the lowest revival time and try it.
     */
    if (false == deadNodes.isEmpty()) {//3
        final List<DeadNode> selectedDeadNodes = new ArrayList<>(deadNodes);
        /*
         * We'd like NodeSelectors to remove items directly from deadNodes
         * so we can find the minimum after it is filtered without having
         * to compare many things. This saves us a sort on the unfiltered
         * list.
         */
        nodeSelector.select(new Iterable<Node>() {
            @Override
            public Iterator<Node> iterator() {
                return new DeadNodeIteratorAdapter(selectedDeadNodes.iterator());
            }
        });
        if (false == selectedDeadNodes.isEmpty()) {
            return singletonList(Collections.min(selectedDeadNodes).node);
        }
    }
    throw new IOException("NodeSelector [" + nodeSelector + "] rejected all nodes, "
            + "living " + livingNodes + " and dead " + deadNodes);
}

(1)该方法的第一步,就是从节点状态map内选出所有的node,如果不是dead node,则直接加入到livingNodes列表,如果是的话,判断一下是否需要充实(根据dead的时间)。这里每次请求结束后会根据请求结果更新blacklist的值
(2)第二步,判断liveingNodes是否为空,不为空,则配合负载算法,重排序livingNodes,在后面使用过程中,从livingNodes中选择Node
(3)如果,liveingNodes为空,判断deadNodes是否为空,不为空的话,从deadNodes中选择一个最快被解禁的node,作为请求的Node(死马当活马医)

3总结

看起RestHighLevelClient很简单,其实内部还是有很多复杂逻辑的,有兴趣的可以深入了解下

更多精彩内容,请关注公众号


公众号二维码.jpg
上一篇下一篇

猜你喜欢

热点阅读