Elasticsearch 查询分片选择

2019-07-09  本文已影响0人  b2499d9c833c

对于同一次查询查询请求,ES会在选择某个分片副本进行查询。
例如:如图1所示,索引twitter中有3个主分片,每个分片有2个副本,共9个分片,一次搜索请求会由3个分片来完成,他们可能是主分片也可能是副本分片。即一次搜索请求只会命中所有分片副本中的一个。
主分片和副本分片中的数据理论上是完全一致的,并且一次查询只会使用一个副本,所以增加副本数不会因为并行查询而使搜索变快。但是在某些场景下多个副本下,可能会选择出一个当前集群状态下能快速响应的副本,从而使搜索快速响应。

图1

副本选择

副本选择的方式有如下几种:

image
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId,
                                                        DiscoveryNodes nodes, @Nullable String preference,
                                                        @Nullable ResponseCollectorService collectorService,
                                                        @Nullable Map<String, Long> nodeCounts) {
        if (preference == null || preference.isEmpty()) {
            if (awarenessAttributes.isEmpty()) {
                if (useAdaptiveReplicaSelection) {
                    return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
                } else {
                    return indexShard.activeInitializingShardsRandomIt();
                }
            } else {
                return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
            }
        }
        if (preference.charAt(0) == '_') {
            Preference preferenceType = Preference.parse(preference);
            if (preferenceType == Preference.SHARDS) {
                // starts with _shards, so execute on specific ones
                int index = preference.indexOf('|');

                String shards;
                if (index == -1) {
                    shards = preference.substring(Preference.SHARDS.type().length() + 1);
                } else {
                    shards = preference.substring(Preference.SHARDS.type().length() + 1, index);
                }
                String[] ids = Strings.splitStringByCommaToArray(shards);
                boolean found = false;
                for (String id : ids) {
                    if (Integer.parseInt(id) == indexShard.shardId().id()) {
                        found = true;
                        break;
                    }
                }
                if (!found) {
                    return null;
                }
                // no more preference
                if (index == -1 || index == preference.length() - 1) {
                    if (awarenessAttributes.isEmpty()) {
                        if (useAdaptiveReplicaSelection) {
                            return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
                        } else {
                            return indexShard.activeInitializingShardsRandomIt();
                        }
                    } else {
                        return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
                    }
                } else {
                    // update the preference and continue
                    preference = preference.substring(index + 1);
                }
            }
            preferenceType = Preference.parse(preference);
            switch (preferenceType) {
                case PREFER_NODES:
                    final Set<String> nodesIds =
                            Arrays.stream(
                                    preference.substring(Preference.PREFER_NODES.type().length() + 1).split(",")
                            ).collect(Collectors.toSet());
                    return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);
                case LOCAL:
                    return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));
                case PRIMARY:
                    deprecationLogger.deprecated("[_primary] has been deprecated in 6.1+, and will be removed in 7.0; " +
                        "use [_only_nodes] or [_prefer_nodes]");
                    return indexShard.primaryActiveInitializingShardIt();
                case REPLICA:
                    deprecationLogger.deprecated("[_replica] has been deprecated in 6.1+, and will be removed in 7.0; " +
                        "use [_only_nodes] or [_prefer_nodes]");
                    return indexShard.replicaActiveInitializingShardIt();
                case PRIMARY_FIRST:
                    deprecationLogger.deprecated("[_primary_first] has been deprecated in 6.1+, and will be removed in 7.0; " +
                        "use [_only_nodes] or [_prefer_nodes]");
                    return indexShard.primaryFirstActiveInitializingShardsIt();
                case REPLICA_FIRST:
                    deprecationLogger.deprecated("[_replica_first] has been deprecated in 6.1+, and will be removed in 7.0; " +
                        "use [_only_nodes] or [_prefer_nodes]");
                    return indexShard.replicaFirstActiveInitializingShardsIt();
                case ONLY_LOCAL:
                    return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
                case ONLY_NODES:
                    String nodeAttributes = preference.substring(Preference.ONLY_NODES.type().length() + 1);
                    return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttributes.split(","), nodes);
                default:
                    throw new IllegalArgumentException("unknown preference [" + preferenceType + "]");
            }
        }
        // if not, then use it as the index
        int routingHash = Murmur3HashFunction.hash(preference);
        if (nodes.getMinNodeVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
            // The AllocationService lists shards in a fixed order based on nodes
            // so earlier versions of this class would have a tendency to
            // select the same node across different shardIds.
            // Better overall balancing can be achieved if each shardId opts
            // for a different element in the list by also incorporating the
            // shard ID into the hash of the user-supplied preference key.
            routingHash = 31 * routingHash + indexShard.shardId.hashCode();
        }
        if (awarenessAttributes.isEmpty()) {
            return indexShard.activeInitializingShardsIt(routingHash);
        } else {
            return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);
        }
    }

根据preference参数选择副本

偏好 说明
_primary 发送到集群的相关操作请求只会在主分片上执行。
_primary_first 指查询会先在主分片中查询,如果主分片找不到(挂了),就会在副本中查询。
_replica 发送到集群的相关操作请求只会在副本上执行。
_replica_first 指查询会先在副本中查询,如果副本找不到(挂了),就会在主分片中查询。
_local 指查询操作会优先在本地节点有的分片中查询,没有的话再在其它节点查询。
_only_local 尽在本地节点上的分片上执行查询。
_prefer_nodes:abc,xyz 在提供的节点上优先执行(在这种情况下为'abc'或'xyz')
_shards:2,3 限制操作到指定的分片。 (23)。这个偏好可以与其他偏好组合,但必须首先出现_shards:2,3 | _primary
_only_nodes:node1,node2 指在指定id的节点里面进行查询,如果该节点只有要查询索引的部分分片,就只在这部分分片中查找,不同节点之间用“,”分隔。
custom(自定义) 任何不以_开头的值。如果两个搜索都为其首选项提供相同的自定义字符串值,并且基础集群状态不会更改,则将使用相同的分片顺序进行搜索。这并不能保证每次都使用完全相同的分片:群集状态以及所选分片可能会因包括分片重定位和分片失败在内的多种原因而发生变化,并且节点有时可能会拒绝导致备用节点回退的搜索。然而,在实践中,碎片的排序趋于长时间保持稳定。自定义首选项值的良好候选者类似于Web会话ID或用户名。
image.png

注意

机架感知特性

如果在一个物理机上运行多个虚拟机,并且在多个虚拟机中运行了多个es节点,或者在多个机架上,多个机房,都有可能有多个es节点在相同的物理机上,或者在相同的机架上,或者在相同的机房里,那么这些节点就可能会因为物理机,机架,机房的问题,一起崩溃。
如果es可以感知到硬件的物理布局,就可以确保说,priamry shard和replica shard一定是分配到不同的物理机,或者物理机架,或者不同的机房,这样可以最小化物理机,机架,机房崩溃的风险。

shard allocation awareness可以告诉es我们的硬件架构

举个栗子,如果我们有多个机架,那么我们启动一个node的时候,就要告诉这个node它在哪个机架上,可以给它一个rack_id,比如下面的命令:./bin/elasticsearch -Enode.attr.rack_id=rack_one,也可以在elasticsearch.yml中设置这个机架id

cluster.routing.allocation.awareness.attributes: rack_id
node.attr.rack_id=rack_one

上面的两行设置里,第一行是设置机架id的属性名称,第二行是用那个机架id属性名称设置具体的机架id

如果启动两个node,都在一个机架上,此时创建一个有5个primary shard和5个replica shard的索引,此时shards会被分配到两个节点上

如果再启动两个node,设置为另外一个机架,此时es会将shard移动到新的node上,去确保说,不会让primary shard和其replica shard在同一个机架上。但是如果机架2故障了,为了恢复集群,那么还是会在恢复的时候,将shards全部在机架1上分配的

prefer local shard机制:在执行search或者get请求的时候,如果启用了shard awareness特性,那么es会尽量使用local shard来执行请求,也就是在同一个awareness group中的shard来执行请求,也就是说尽量用一个机架或者一个机房中的shard来执行请求,而不要跨机架或者跨机房来执行请求

可以指定多个awareness属性,比如机架id和机房名称,类似下面:

cluster.routing.allocation.awareness.attributes: rack_id,zone
强制性的感知

如果现在我们有两个机房,并且有足够的硬件资源来容纳所有的shard,但是可能每个机房的硬件只能容纳一半shard,不能容纳所有的shard。如果仅仅使用原始的感知特性,如果一个机房故障了,那么es会将需要恢复的shard全部分配给剩下的一个机房,但是剩下的那个机房的硬件资源并不足以容纳所有的shard。

强制感知特性会解决这个问题,因为这个特性会绝对不允许在一个机房内分配所有的shard

比如说,有一个感知属性叫做zone,有两个机房,zone1和zone2,看看下面的配置:

cluster.routing.allocation.awareness.attributes: zone
cluster.routing.allocation.awareness.force.zone.values: zone1,zone2

[图片上传失败...(image-a61d7e-1562841058405)]

那么此时如果将2个node分配给zone1机房,然后创建一个索引,5个primary shard和5个replica shard,但是此时只会在zone1机房分配5个primary shard,只有我们启动一批node在zone2机房,才会分配replica shard

配置了机架后,副本选择默认是优先在本地的机架上来查找的。

        final ArrayList<ShardRouting> to = new ArrayList<>();
        for (final String attribute : key.attributes) {
            final String localAttributeValue = nodes.getLocalNode().getAttributes().get(attribute);
            if (localAttributeValue != null) {
                for (Iterator<ShardRouting> iterator = from.iterator(); iterator.hasNext(); ) {
                    ShardRouting fromShard = iterator.next();
                    final DiscoveryNode discoveryNode = nodes.get(fromShard.currentNodeId());
                    if (discoveryNode == null) {
                        iterator.remove(); // node is not present anymore - ignore shard
                    } else if (localAttributeValue.equals(discoveryNode.getAttributes().get(attribute))) {
                        iterator.remove();
                        to.add(fromShard);
                    }
                }
            } 
        }
        return Collections.unmodifiableList(to);
    }

默认副本选择方式

在7.0之前的版本,搜索时,初始使用一个随机值,接下来的请求轮询每一个分片。

    /**
     * Returns an iterator over active and initializing shards. Making sure though that
     * its random within the active shards, and initializing shards are the last to iterate through.
     */
    public ShardIterator activeInitializingShardsRandomIt() {
        return activeInitializingShardsIt(shuffler.nextSeed());
    }
 
    @Override
    public int nextSeed() {
        return seed.getAndIncrement();
    }
    @Override
    public List<ShardRouting> shuffle(List<ShardRouting> shards, int seed) {
        return CollectionUtils.rotate(shards, seed);
    }
    /**
     * Returns an iterator over active and initializing shards. Making sure though that
     * its random within the active shards, and initializing shards are the last to iterate through.
     */
    public ShardIterator activeInitializingShardsIt(int seed) {
      // 保证初始化好的排在正在初始化中的前面
        if (allInitializingShards.isEmpty()) {
            return new PlainShardIterator(shardId, shuffler.shuffle(activeShards, seed));
        }
        ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
        ordered.addAll(shuffler.shuffle(activeShards, seed));
        ordered.addAll(allInitializingShards);
        return new PlainShardIterator(shardId, ordered);
    }
  /**
     * Return a rotated view of the given list with the given distance.
     */
  // 根据seed的自增和哈希来保证不同的分片被轮询到
    public static <T> List<T> rotate(final List<T> list, int distance) {
        if (list.isEmpty()) {
            return list;
        }

        int d = distance % list.size();
        if (d < 0) {
            d += list.size();
        }

        if (d == 0) {
            return list;
        }

        return new RotatedList<>(list, d);
    }

假设现在有三个分片,请求响应的时延如下:

自适应副本选择

Our ARS implementation is based on a formula where, for each search request, 
Elasticsearch ranks each copy of the shard to determine which is likeliest to be the "best" 
copy to send the request to. Instead of sending requests in a round-robin fashion to each 
copy of the shard, Elasticsearch selects the "best" copy and routes the request there.

The ARS formula initially seems complex, but let's break it down:

Ψ(s) = R(s) - 1/µ̄(s) + (q̂(s))^3 / µ̄(s)

Where q̂(s) is:

q̂(s) = 1 + (os(s) * n) + q(s)
 private double innerRank(long outstandingRequests) {
            // the concurrency compensation is defined as the number of
            // outstanding requests from the client to the node times the number
            // of clients in the system
            double concurrencyCompensation = outstandingRequests * clientNum;

            // Cubic queue adjustment factor. The paper chose 3 though we could
            // potentially make this configurable if desired.
            int queueAdjustmentFactor = 3;

            // EWMA of queue size
            double qBar = queueSize;
            double qHatS = 1 + concurrencyCompensation + qBar;

            // EWMA of response time
            double rS = responseTime / FACTOR;
            // EWMA of service time
            double muBarS = serviceTime / FACTOR;

            // The final formula
            double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS);
            return rank;
        }

And looking at the individual pieces:

private static class NodeStatistics {
        final String nodeId;
        final ExponentiallyWeightedMovingAverage queueSize;
        final ExponentiallyWeightedMovingAverage responseTime;
        double serviceTime;

        NodeStatistics(String nodeId,
                       ExponentiallyWeightedMovingAverage queueSizeEWMA,
                       ExponentiallyWeightedMovingAverage responseTimeEWMA,
                       double serviceTimeEWMA) {
            this.nodeId = nodeId;
            this.queueSize = queueSizeEWMA;
            this.responseTime = responseTimeEWMA;
            this.serviceTime = serviceTimeEWMA;
        }
    }


"adaptive_selection": {
                "5BN2QxfZQ3yzotzQhUXzlg": {
                    "outgoing_searches": 0,
                    "avg_queue_size": 0,
                    "avg_service_time_ns": 2976073,
                    "avg_response_time_ns": 3396261,
                    "rank": "3.4"
                },
                "eAznL5r5RreHLIU16XpczA": {
                    "outgoing_searches": 0,
                    "avg_queue_size": 0,
                    "avg_service_time_ns": 8884750,
                    "avg_response_time_ns": 15520622,
                    "rank": "15.5"
                }
            }
没有负载下的对比

有上图可以看出,即使集群处于没有负载的情况下,ARS仍然有利于增加吞吐和减少时延。

单个节点在有负载的情况下

在某个数据节点处于高负载的情况下,吞吐有了很大的提高,延迟中位数有所增加, 这是为了绕开高负载的节点,增加了压力较低的节点的负载,从而增加了延迟,


private static List<ShardRouting> rankShardsAndUpdateStats(List<ShardRouting> shards, final ResponseCollectorService collector,
                                                               final Map<String, Long> nodeSearchCounts) {
        if (collector == null || nodeSearchCounts == null || shards.size() <= 1) {
            return shards;
        }

        // Retrieve which nodes we can potentially send the query to
        final Set<String> nodeIds = getAllNodeIds(shards);
        final int nodeCount = nodeIds.size();

        final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats = getNodeStats(nodeIds, collector);

        // Retrieve all the nodes the shards exist on
        final Map<String, Double> nodeRanks = rankNodes(nodeStats, nodeSearchCounts);

        // sort all shards based on the shard rank
        ArrayList<ShardRouting> sortedShards = new ArrayList<>(shards);
        Collections.sort(sortedShards, new NodeRankComparator(nodeRanks));

        // adjust the non-winner nodes' stats so they will get a chance to receive queries
        if (sortedShards.size() > 1) {
            ShardRouting minShard = sortedShards.get(0);
            // If the winning shard is not started we are ranking initializing
            // shards, don't bother to do adjustments
            if (minShard.started()) {
                String minNodeId = minShard.currentNodeId();
                Optional<ResponseCollectorService.ComputedNodeStats> maybeMinStats = nodeStats.get(minNodeId);
                if (maybeMinStats.isPresent()) {
                    adjustStats(collector, nodeStats, minNodeId, maybeMinStats.get());
                    // Increase the number of searches for the "winning" node by one.
                    // Note that this doesn't actually affect the "real" counts, instead
                    // it only affects the captured node search counts, which is
                    // captured once for each query in TransportSearchAction
                    nodeSearchCounts.compute(minNodeId, (id, conns) -> conns == null ? 1 : conns + 1);
                }
            }
        }

        return sortedShards;
    }

eg. 防止某些节点一直不处理请求,会在每次选择完节点后,对选出的节点的计数+1,并且调整没有选中的节点。

/**
     * Adjust the for all other nodes' collected stats. In the original ranking paper there is no need to adjust other nodes' stats because
     * Cassandra sends occasional requests to all copies of the data, so their stats will be updated during that broadcast phase. In
     * Elasticsearch, however, we do not have that sort of broadcast-to-all behavior. In order to prevent a node that gets a high score and
     * then never gets any more requests, we must ensure it eventually returns to a more normal score and can be a candidate for serving
     * requests.
     *
     * This adjustment takes the "winning" node's statistics and adds the average of those statistics with each non-winning node. Let's say
     * the winning node had a queue size of 10 and a non-winning node had a queue of 18. The average queue size is (10 + 18) / 2 = 14 so the
     * non-winning node will have statistics added for a queue size of 14. This is repeated for the response time and service times as well.
     */
    private static void adjustStats(final ResponseCollectorService collector,
                                    final Map<String, Optional<ResponseCollectorService.ComputedNodeStats>> nodeStats,
                                    final String minNodeId,
                                    final ResponseCollectorService.ComputedNodeStats minStats) {
        if (minNodeId != null) {
            for (Map.Entry<String, Optional<ResponseCollectorService.ComputedNodeStats>> entry : nodeStats.entrySet()) {
                final String nodeId = entry.getKey();
                final Optional<ResponseCollectorService.ComputedNodeStats> maybeStats = entry.getValue();
                if (nodeId.equals(minNodeId) == false && maybeStats.isPresent()) {
                    final ResponseCollectorService.ComputedNodeStats stats = maybeStats.get();
                    final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2;
                    final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2;
                    final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2;
                    collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService);
                }
            }
        }
    }

https://elasticsearch.cn/article/334
https://juejin.im/post/5b83b1d5e51d4538da22ef50
https://doc.yonyoucloud.com/doc/mastering-elasticsearch/chapter-4/45_README.html
https://www.elastic.co/guide/en/elasticsearch/reference/6.7/allocation-awareness.html
http://zh1cheung.com/zhi1cheung.github.io/elk/2018/10/02/elk/
https://pdfs.semanticscholar.org/99c7/f437d672abf56fdc9438c0c46a7ef716e8c7.pdf
https://www.elastic.co/guide/en/elasticsearch/reference/6.7/search-request-preference.html
https://jobs.zalando.com/tech/blog/a-closer-look-at-elasticsearch-express/?gh_src=4n3gxh1
https://www.elastic.co/blog/improving-response-latency-in-elasticsearch-with-adaptive-replica-selection
https://www.elastic.co/guide/cn/elasticsearch/guide/current/_search_options.html

上一篇 下一篇

猜你喜欢

热点阅读