opensearch分片分配和rebalance分析

2023-05-04  本文已影响0人  以梦为马驾驾驾

分片分配就是把一个分片指派到集群中某个节点的过程。分配决策由主节点完成,分配决策包含两方面:
· 哪些分片应该分配给哪些节点;
· 哪个分片作为主分片,哪些作为副分片。

参考:

cluster-level: https://www.elastic.co/guide/en/elasticsearch/reference/7.10/modules-cluster.html
index-level: https://www.elastic.co/guide/en/elasticsearch/reference/7.10/shard-allocation-filtering.html

设置

Besides these, there are a few other miscellaneous cluster-level settings.

以上分别是: 集群级别的分配设置, 磁盘水位based, 分片感知和强制感知分配, 集群级别的shard过滤
除此之外, 还有 index 级别的shard分配过滤

参考: https://www.cnblogs.com/memento/p/14494010.html 这个博客把设置和说明都写清楚了

源码分析

集群在重启的时候, 选主完成之后, 会触发一次全量的分配. 除此之外, 在新建索引, 以及节点上下线等情况下, 也会触发一些index 的shard的分配. 虽然分配的过程和策略都不太相同, 但总体都抽象封装在了两个组件中:

a 新建索引

allocators只负责统计每个node的分片个数, 然后按照分片个数 排序, 只是个数, 不是负载 , 然后由deciders组件挨个遍历node, 判断索引的分片是否可以放在node上, 此时 deciders可以关心负载, 规则, 磁盘容量等等

b 已有索引

区分主分片还是副分片
对于主: allocators只允许把主分片指定在已经拥有该分片完整数据的节点上。
对于副: ,allocators则是先判断其他节点上是否已有该分片的数据的副本(即便数据不是最新的)

触发时机:

· index增删;
· node增删;
· 手工reroute;
· rebalancing;
· replica数量改变;
· 集群重启。

分片分配以及rebalance的动作基本都发生在AllocationService 中, AllocationService 属于会对整个集群的状态有影响的模块, 属于ClusterModule.

AllocationService 依赖于:

最重要: ShardsAllocator ExistingShardsAllocator AllocationDeciders

allocator

· primaryShardAllocator:找到那些拥有某分片最新数据的节点;
· replicaShardAllocator:找到磁盘上拥有这个分片数据的节点;
· BalancedShardsAllocator:找到拥有最少分片个数的节点。

继承关系:
image.png image.png
依赖关系:
image.png

deciders 以及 分类

子类通过自定义实现接口来实现不同策略.
canRebalance
canAllocate
canRemain
canForceAllocatePrimary

各种子类的策略可归为以下几类:

最终有四种结果:

    public static final Decision ALWAYS = new Single(Type.YES);
    public static final Decision YES = new Single(Type.YES);
    public static final Decision NO = new Single(Type.NO);
    public static final Decision THROTTLE = new Single(Type.THROTTLE);
org.opensearch.cluster.routing.allocation.decider.AllocationDecider.png

reroute 流程分析

外部调用触发: org.opensearch.cluster.routing.allocation.AllocationService#reroute(org.opensearch.cluster.ClusterState, org.opensearch.cluster.routing.allocation.command.AllocationCommands, boolean, boolean)

内部接口调用: org.opensearch.cluster.routing.allocation.AllocationService#reroute(org.opensearch.cluster.ClusterState, java.lang.String)

内部接口调用

在集群状态/设置/重启等情况下可能会被调用.
以集群重启, 元数据恢复中的shard恢复为例:

AllocationService.reroute对一个或多个主分片或副分片执行分配,分配以后产生新的集群状态。Master节点将新的集群状态广播下去,触发后续的流程。对于内部模块调用,返回值为新产生的集群状态,对于手工执行的reroute命令,返回命令执行结果。

    public ClusterState reroute(ClusterState clusterState, String reason) {
        ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState); // reroute不仅care 分片,还有副本
        // 获取可变的 RoutingNodes, 关于 RoutingNodes 和 RoutingTable的区别: 见: ...todo
        RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState); // 从RoutingTable开始构建, 详情见方法内部
        // shuffle the unassigned nodes, just so we won't have things like poison failed shards
        routingNodes.unassigned().shuffle();
        RoutingAllocation allocation = new RoutingAllocation(//持有routingNodes中shard的allocation状态,以及负责allocation的decider
            allocationDeciders,
            routingNodes,
            fixedClusterState,
            clusterInfoService.getClusterInfo(),
            snapshotsInfoService.snapshotShardSizes(),
            currentNanoTime()
        );
        reroute(allocation); // 核心逻辑
        if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) { // 没有变化, 说明需要reroute的
            return clusterState;
        } // 有变化, 构建reroute的结果
        return buildResultAndLogHealthChange(clusterState, allocation, reason); // 返回新的ClusterState, 新的ClusterState,
    }       // 看调用者怎么处理新状态,一般会广播, 进入二阶段提交, 然后data节点就可以learn了

构建RoutingNodes的过程详解:

    public RoutingNodes(ClusterState clusterState, boolean readOnly) {
        this.readOnly = readOnly;
        final RoutingTable routingTable = clusterState.routingTable(); // 从RoutingTable开始构建

        // fill in the nodeToShards with the "live" nodes
        for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) {
            String nodeId = cursor.value.getId();
            this.nodesToShards.put(cursor.value.getId(), new RoutingNode(nodeId, clusterState.nodes().get(nodeId)));
        } // 构建node <-> RoutingNodes(空的, 还没有数据)
        // 开始填充
        // fill in the inverse of node -> shards allocated
        // also fill replicaSet information // indexRoutingTable代表每个index, indexShard代表shard
        for (ObjectCursor<IndexRoutingTable> indexRoutingTable : routingTable.indicesRouting().values()) {
            for (IndexShardRoutingTable indexShard : indexRoutingTable.value) {
                assert indexShard.primary != null;
                for (ShardRouting shard : indexShard) { // 三重循环, 终于达到shard级别
                    // to get all the shards belonging to an index, including the replicas,
                    // we define a replica set and keep track of it. A replica set is identified
                    // by the ShardId, as this is common for primary and replicas.
                    // A replica Set might have one (and not more) replicas with the state of RELOCATING.
                    if (shard.assignedToNode()) {//找到shard所在的node,如果shard存在nodeId,但是nodeId不存在于nodesToShards, 添加
                        RoutingNode routingNode = this.nodesToShards.computeIfAbsent( // 不存在于nodesToShards的情况是
                            shard.currentNodeId(), // 可能节点下线, 之前在routingTable里, 但是现在不在了?(不过在集群重启恢复这个场景不存在)
                            k -> new RoutingNode(shard.currentNodeId(), clusterState.nodes().get(shard.currentNodeId()))
                        ); // 现在构建的是 node <-> shard allocated, 哪怕下线的node, 也应该有信息,只要它有shard存在
                        routingNode.add(shard);
                        assignedShardsAdd(shard); // 标识
                        if (shard.relocating()) { // 被分配的shard正在被relocating, 现在它本身是 source
                            relocatingShards++; // 更新状态
                            // Add the counterpart shard with relocatingNodeId reflecting the source from which
                            // it's relocating from.
                            routingNode = nodesToShards.computeIfAbsent(
                                shard.relocatingNodeId(), // 当下获取的是 source node 的id, 因为它现在的状态是 RELOCATING,本身是source
                                k -> new RoutingNode(shard.relocatingNodeId(), clusterState.nodes().get(shard.relocatingNodeId()))
                            );
                            ShardRouting targetShardRouting = shard.getTargetRelocatingShard();
                            addInitialRecovery(targetShardRouting, indexShard.primary); // 双向构建, 就像图的边的两个节点
                            routingNode.add(targetShardRouting);  // 不过上面这行是在增加节点的 recovery的出度, 即此主shard正在被多少个副shard复制
                            assignedShardsAdd(targetShardRouting);
                        } else if (shard.initializing()) { // 可能是primary or replica
                            if (shard.primary()) {
                                inactivePrimaryCount++; // 更新不活跃
                            }
                            inactiveShardCount++;
                            addInitialRecovery(shard, indexShard.primary);
                        }
                    } else {
                        unassignedShards.add(shard); // 更新
                    }
                }
            }
        }
    

调用内部的reroute逻辑:

  1. 清除delay超时的shard, 让他们可以被重新分配
  2. 优先分配已经存在数据的shard
    1. 判断优先级, 必然先回复高优的
    2. 做 已存在数据的shardallocator的前置动作, 如清理缓存, 避免影响到判断
    3. 先分配主shard
    4. 然后为replica分配做准备, 如果有副本已经在initializing了, 即有数据了, 最好还是把它放在这个节点上, 不然, 如果现在迁走, 会有新的复制工作. 为什么主不用这个操作? 因为主的allocation, 其实是在找已经有valid 数据的节点.
    5. 分配replica
  3. 然后调用ShardsAllocator, 为了集群的平衡
  4. 最后返回, 如果做了reroute的操作, buildResultAndLogHealthChange 会将新的ClusterState构建出来, 并且进行二阶段提交, 然后data节点就可以learn了
    private void reroute(RoutingAllocation allocation) {
        assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
        assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), allocation).isEmpty()
            : "auto-expand replicas out of sync with number of nodes in the cluster";
        assert assertInitialized();

        removeDelayMarkers(allocation); // 如果有shard 因为 node 离开的delay分配而超时, 去掉delay标识,表示可以重新分配了
        // 首先, 优先分配已经存在 shard 副本的,
        allocateExistingUnassignedShards(allocation);  // try to allocate existing shard copies first
        shardsAllocator.allocate(allocation); // ShardsAllocator主要是为了集群的平衡
        assert RoutingNodes.assertShardStats(allocation.routingNodes());
    }

    private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
        allocation.routingNodes().unassigned().sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering
        // 按照优先级 "index.priority" 优先恢复高优的
        for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
            existingShardsAllocator.beforeAllocation(allocation);//各个不同的ExistingShardsAllocators在allocation前要做的准备动作.
        } // 如 GatewayAllocation 会到验证清理一些缓存
        // allocate 主
        final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
        while (primaryIterator.hasNext()) {
            final ShardRouting shardRouting = primaryIterator.next();
            if (shardRouting.primary()) { // 调用背后的  primary allocator
                getAllocatorForShard(shardRouting, allocation).allocateUnassigned(shardRouting, allocation, primaryIterator);
            }
        }

        for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
            existingShardsAllocator.afterPrimariesBeforeReplicas(allocation); // 为了 replica allocation 做准备
        } // 例如: Gateway的策略是, 如果当前有正在被恢复的副本, 则也许可以跳过它的allocate, 这样可以减少复制

        final RoutingNodes.UnassignedShards.UnassignedIterator replicaIterator = allocation.routingNodes().unassigned().iterator();
        while (replicaIterator.hasNext()) {
            final ShardRouting shardRouting = replicaIterator.next();
            if (shardRouting.primary() == false) { // 调用背后的 replicas  allocator
                getAllocatorForShard(shardRouting, allocation).allocateUnassigned(shardRouting, allocation, replicaIterator);
            }
        }
    }
上一篇下一篇

猜你喜欢

热点阅读