opensearch分片分配和rebalance分析
分片分配就是把一个分片指派到集群中某个节点的过程。分配决策由主节点完成,分配决策包含两方面:
· 哪些分片应该分配给哪些节点;
· 哪个分片作为主分片,哪些作为副分片。
参考:
cluster-level: https://www.elastic.co/guide/en/elasticsearch/reference/7.10/modules-cluster.html
index-level: https://www.elastic.co/guide/en/elasticsearch/reference/7.10/shard-allocation-filtering.html
设置
- Cluster-level shard allocation settings control allocation and rebalancing operations.
- Disk-based shard allocation settings explains how Elasticsearch takes available disk space into account, and the related settings.
- Shard allocation awareness and Forced awareness control how shards can be distributed across different racks or availability zones.
- Cluster-level shard allocation filtering allows certain nodes or groups of nodes excluded from allocation so that they can be decommissioned.
Besides these, there are a few other miscellaneous cluster-level settings.
以上分别是: 集群级别的分配设置, 磁盘水位based, 分片感知和强制感知分配, 集群级别的shard过滤
除此之外, 还有 index 级别的shard分配过滤
参考: https://www.cnblogs.com/memento/p/14494010.html 这个博客把设置和说明都写清楚了
源码分析
集群在重启的时候, 选主完成之后, 会触发一次全量的分配. 除此之外, 在新建索引, 以及节点上下线等情况下, 也会触发一些index 的shard的分配. 虽然分配的过程和策略都不太相同, 但总体都抽象封装在了两个组件中:
- allocators: 尝试寻找最优的节点分配分片
- deciders: 判断和决定要不要进行此次分配
a 新建索引
allocators只负责统计每个node的分片个数, 然后按照分片个数 排序, 只是个数, 不是负载 , 然后由deciders组件挨个遍历node, 判断索引的分片是否可以放在node上, 此时 deciders可以关心负载, 规则, 磁盘容量等等
b 已有索引
区分主分片还是副分片
对于主: allocators只允许把主分片指定在已经拥有该分片完整数据的节点上。
对于副: ,allocators则是先判断其他节点上是否已有该分片的数据的副本(即便数据不是最新的)
触发时机:
· index增删;
· node增删;
· 手工reroute;
· rebalancing;
· replica数量改变;
· 集群重启。
分片分配以及rebalance的动作基本都发生在AllocationService
中, AllocationService
属于会对整个集群的状态有影响的模块, 属于ClusterModule
.
AllocationService
依赖于:
-
AllocationDeciders
: AllocationDecider的综合组合体, 负责决定shard是否能分配 -
ShardsAllocator
: 由于rebalance 或者 节点失败, 而需要重新relocation. 根据权重策略在集群的各节点间均衡分片分布 -
ClusterInfoService
: 集群的一些信息 -
SnapshotsInfoService
: todo -
ExistingShardsAllocator
(在test or benchmark的时候, 只使用一个GatewayAllocator作为唯一的ExistingShardsAllocator, prod情况下, 不仅仅只加载GatewayAllocator
,还加载其他ClusterPlugin
的ExistingShardsAllocator
) 对于已经有数据存在disk的分片, 他们的分配. 找到现有分片
最重要: ShardsAllocator
ExistingShardsAllocator
AllocationDeciders
allocator
· primaryShardAllocator:找到那些拥有某分片最新数据的节点;
· replicaShardAllocator:找到磁盘上拥有这个分片数据的节点;
· BalancedShardsAllocator:找到拥有最少分片个数的节点。
继承关系:
image.png image.png依赖关系:
image.pngdeciders 以及 分类
子类通过自定义实现接口来实现不同策略.
canRebalance
canAllocate
canRemain
canForceAllocatePrimary
各种子类的策略可归为以下几类:
- 负载均衡
- 并发控制
- 条件限制
最终有四种结果:
public static final Decision ALWAYS = new Single(Type.YES);
public static final Decision YES = new Single(Type.YES);
public static final Decision NO = new Single(Type.NO);
public static final Decision THROTTLE = new Single(Type.THROTTLE);
org.opensearch.cluster.routing.allocation.decider.AllocationDecider.png
reroute 流程分析
外部调用触发: org.opensearch.cluster.routing.allocation.AllocationService#reroute(org.opensearch.cluster.ClusterState, org.opensearch.cluster.routing.allocation.command.AllocationCommands, boolean, boolean)
内部接口调用: org.opensearch.cluster.routing.allocation.AllocationService#reroute(org.opensearch.cluster.ClusterState, java.lang.String)
内部接口调用
在集群状态/设置/重启等情况下可能会被调用.
以集群重启, 元数据恢复中的shard恢复为例:
AllocationService.reroute对一个或多个主分片或副分片执行分配,分配以后产生新的集群状态。Master节点将新的集群状态广播下去,触发后续的流程。对于内部模块调用,返回值为新产生的集群状态,对于手工执行的reroute命令,返回命令执行结果。
public ClusterState reroute(ClusterState clusterState, String reason) {
ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState); // reroute不仅care 分片,还有副本
// 获取可变的 RoutingNodes, 关于 RoutingNodes 和 RoutingTable的区别: 见: ...todo
RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState); // 从RoutingTable开始构建, 详情见方法内部
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(//持有routingNodes中shard的allocation状态,以及负责allocation的decider
allocationDeciders,
routingNodes,
fixedClusterState,
clusterInfoService.getClusterInfo(),
snapshotsInfoService.snapshotShardSizes(),
currentNanoTime()
);
reroute(allocation); // 核心逻辑
if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) { // 没有变化, 说明需要reroute的
return clusterState;
} // 有变化, 构建reroute的结果
return buildResultAndLogHealthChange(clusterState, allocation, reason); // 返回新的ClusterState, 新的ClusterState,
} // 看调用者怎么处理新状态,一般会广播, 进入二阶段提交, 然后data节点就可以learn了
构建RoutingNodes的过程详解:
public RoutingNodes(ClusterState clusterState, boolean readOnly) {
this.readOnly = readOnly;
final RoutingTable routingTable = clusterState.routingTable(); // 从RoutingTable开始构建
// fill in the nodeToShards with the "live" nodes
for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) {
String nodeId = cursor.value.getId();
this.nodesToShards.put(cursor.value.getId(), new RoutingNode(nodeId, clusterState.nodes().get(nodeId)));
} // 构建node <-> RoutingNodes(空的, 还没有数据)
// 开始填充
// fill in the inverse of node -> shards allocated
// also fill replicaSet information // indexRoutingTable代表每个index, indexShard代表shard
for (ObjectCursor<IndexRoutingTable> indexRoutingTable : routingTable.indicesRouting().values()) {
for (IndexShardRoutingTable indexShard : indexRoutingTable.value) {
assert indexShard.primary != null;
for (ShardRouting shard : indexShard) { // 三重循环, 终于达到shard级别
// to get all the shards belonging to an index, including the replicas,
// we define a replica set and keep track of it. A replica set is identified
// by the ShardId, as this is common for primary and replicas.
// A replica Set might have one (and not more) replicas with the state of RELOCATING.
if (shard.assignedToNode()) {//找到shard所在的node,如果shard存在nodeId,但是nodeId不存在于nodesToShards, 添加
RoutingNode routingNode = this.nodesToShards.computeIfAbsent( // 不存在于nodesToShards的情况是
shard.currentNodeId(), // 可能节点下线, 之前在routingTable里, 但是现在不在了?(不过在集群重启恢复这个场景不存在)
k -> new RoutingNode(shard.currentNodeId(), clusterState.nodes().get(shard.currentNodeId()))
); // 现在构建的是 node <-> shard allocated, 哪怕下线的node, 也应该有信息,只要它有shard存在
routingNode.add(shard);
assignedShardsAdd(shard); // 标识
if (shard.relocating()) { // 被分配的shard正在被relocating, 现在它本身是 source
relocatingShards++; // 更新状态
// Add the counterpart shard with relocatingNodeId reflecting the source from which
// it's relocating from.
routingNode = nodesToShards.computeIfAbsent(
shard.relocatingNodeId(), // 当下获取的是 source node 的id, 因为它现在的状态是 RELOCATING,本身是source
k -> new RoutingNode(shard.relocatingNodeId(), clusterState.nodes().get(shard.relocatingNodeId()))
);
ShardRouting targetShardRouting = shard.getTargetRelocatingShard();
addInitialRecovery(targetShardRouting, indexShard.primary); // 双向构建, 就像图的边的两个节点
routingNode.add(targetShardRouting); // 不过上面这行是在增加节点的 recovery的出度, 即此主shard正在被多少个副shard复制
assignedShardsAdd(targetShardRouting);
} else if (shard.initializing()) { // 可能是primary or replica
if (shard.primary()) {
inactivePrimaryCount++; // 更新不活跃
}
inactiveShardCount++;
addInitialRecovery(shard, indexShard.primary);
}
} else {
unassignedShards.add(shard); // 更新
}
}
}
}
调用内部的reroute逻辑:
- 清除delay超时的shard, 让他们可以被重新分配
- 优先分配已经存在数据的shard
- 判断优先级, 必然先回复高优的
- 做 已存在数据的shardallocator的前置动作, 如清理缓存, 避免影响到判断
- 先分配主shard
- 然后为replica分配做准备, 如果有副本已经在initializing了, 即有数据了, 最好还是把它放在这个节点上, 不然, 如果现在迁走, 会有新的复制工作. 为什么主不用这个操作? 因为主的allocation, 其实是在找已经有valid 数据的节点.
- 分配replica
- 然后调用ShardsAllocator, 为了集群的平衡
- 最后返回, 如果做了reroute的操作, buildResultAndLogHealthChange 会将新的ClusterState构建出来, 并且进行二阶段提交, 然后data节点就可以learn了
private void reroute(RoutingAllocation allocation) {
assert hasDeadNodes(allocation) == false : "dead nodes should be explicitly cleaned up. See disassociateDeadNodes";
assert AutoExpandReplicas.getAutoExpandReplicaChanges(allocation.metadata(), allocation).isEmpty()
: "auto-expand replicas out of sync with number of nodes in the cluster";
assert assertInitialized();
removeDelayMarkers(allocation); // 如果有shard 因为 node 离开的delay分配而超时, 去掉delay标识,表示可以重新分配了
// 首先, 优先分配已经存在 shard 副本的,
allocateExistingUnassignedShards(allocation); // try to allocate existing shard copies first
shardsAllocator.allocate(allocation); // ShardsAllocator主要是为了集群的平衡
assert RoutingNodes.assertShardStats(allocation.routingNodes());
}
private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
allocation.routingNodes().unassigned().sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering
// 按照优先级 "index.priority" 优先恢复高优的
for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
existingShardsAllocator.beforeAllocation(allocation);//各个不同的ExistingShardsAllocators在allocation前要做的准备动作.
} // 如 GatewayAllocation 会到验证清理一些缓存
// allocate 主
final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
while (primaryIterator.hasNext()) {
final ShardRouting shardRouting = primaryIterator.next();
if (shardRouting.primary()) { // 调用背后的 primary allocator
getAllocatorForShard(shardRouting, allocation).allocateUnassigned(shardRouting, allocation, primaryIterator);
}
}
for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
existingShardsAllocator.afterPrimariesBeforeReplicas(allocation); // 为了 replica allocation 做准备
} // 例如: Gateway的策略是, 如果当前有正在被恢复的副本, 则也许可以跳过它的allocate, 这样可以减少复制
final RoutingNodes.UnassignedShards.UnassignedIterator replicaIterator = allocation.routingNodes().unassigned().iterator();
while (replicaIterator.hasNext()) {
final ShardRouting shardRouting = replicaIterator.next();
if (shardRouting.primary() == false) { // 调用背后的 replicas allocator
getAllocatorForShard(shardRouting, allocation).allocateUnassigned(shardRouting, allocation, replicaIterator);
}
}
}