Elasticsearch 5.x 源码分析(11)Shard
这几天刚实操了一把shard 的relocation, 在cerebro 上看着图标移来晃去觉得很爽,但之前并没深究,也没读过这部分的代码。前几晚夜间上线需要重启所有的ES机器,大家也对ES的relocation 讨论了一番,这次就顺便读读这部分的代码。
这里不再讨论shard 在分配的一些算法,和一些基础概念,如果对分片如何分到一个节点的算法有兴趣,或者对一些概念还不熟悉的话,建议可以看下面文章,或者我之前的文章:
Elasticsearch 5.x 源码分析(7)Shard Allocation 的一些小细节
elasticsearch源码分析之Allocation模块
这次打算用一个例子来贯穿大部分的allocation的入口,如下图:
![](https://img.haomeiwen.com/i6181457/0d95b64fabaff365.png)
先假设我们有三个节点,并创建了一个index,2分片2replica。0p 和1r 坐落在Node1 上,1p 和0r 坐落在Node2, 现在我们先假设Node1 Node2都shutdown了,现在这4个分片都是
UNassigned
状态。
1. 重启,初始化分配
这里先撇开集群把shard 进行飘移的问题,简单看一下shard是怎么 assign 的,node1 和node2 启动之前,在集群里面,分片0 和分片1 都会出于 UNassigned
状态;当Node1启动完,它会主动去ping Master节点,在前面的文章都知道了,这时会完整data node的注册的所有步骤,同时Master会下发最新的clusterState 下来。然后在Node1 内部会触发ClusterStateChangeEvent
,这个事件会触发多个modules或者Service去处理逻辑,这里我们关心的逻辑在 IndicesClusterStateService
里。
![](http://upload-images.jianshu.io/upload_images/6181457-7a937b4dbb520d6c.png)
当处理到检测分配到自己的shard是否存在时,Node1 会检测这个shard的状态,如没有,则会去创建一个shard,有的话则会检测是否需要更新状态
![](http://upload-images.jianshu.io/upload_images/6181457-b3af1314b5c1e418.png)
由于Node1 刚刚启动,它还没有去加载 0p 和1r,因此会进入createShard()流程,并在后面跑去加载它本机的shard,这部分的逻辑在createShard() 里面的indicesService.createShard()中
![](http://upload-images.jianshu.io/upload_images/6181457-e7cda5bb6e84e001.png)
最后Node1 启动完后会把0p 和1r 标记成
initializing
并上报给Master,这样Master就知道这个shard已经被assigned
了并标记成initialzing
了。这里介绍的是一个正向的例子,那么一些反向的例子,假如,在Node1 启动的时候,其实过了很久了,0p早就已经分配到其他机器了,那么Master 发过来的
ClusterState
中的0p 就已经是assigned状态了,那么这段逻辑就不是在 createOrUpdateShards(state);
处理了,而是之前的removeUnallocatedIndices(state)
,failMissingShards(state)
和removeShards(state)
来处理,大家有兴趣可以分别过过这两个代码。
现在回到这个正向的例子,稍等片刻之后,Node1 已经完全初始化完0p 和1r 了,那么node1 就会把这两个 shard 置成started
状态,等下一个时间间隔的ClusterState
心跳过来时,显而易见就会进入updateShard(nodes, shardRouting, shard, routingTable, state);
![](http://upload-images.jianshu.io/upload_images/6181457-5c686c247c6baff1.png)
在这里Node1 就会主动发送一个Action 给master 来请求 做
startedShard
这个Action(最后一行)。在
ShardStateAction
里,有两种请求需要处理,分别是
ShardStartedClusterStateTaskExecutor
ShardFailedClusterStateTaskExecutor
分别处理启用或者mark fail,其中我们看看ShardStartedClusterStateTaskExecutor
的逻辑
![](http://upload-images.jianshu.io/upload_images/6181457-2667adcb70b0b388.png)
从代码看出,这里最终要的一个地方就是,并不是 node上报什么状态,Master就会不分青红皂白一味记录,它需要通过
allocationService.applyStartedShards()
来自己去校验和消费这个状态,最后则生成一个新的ClusterState
,这会体现在下一次的心跳下发。
而ShardFailedClusterStateTaskExecutor
则是处理Node上报的failShard的事情,这种情况我觉得比较少见
经过这一轮后,Node1 上的所有shards都恢复正常,那么Node2 也按这个流程,最后Node1 Node2 启动完后就是下图
![](http://upload-images.jianshu.io/upload_images/6181457-a384feab4068fd33.png)
2. Node1 挂了,Master标记0p,1r
刚刚上面说到,ShardFailedClusterStateTaskExecutor
这种主动上报一般是比较少见的,我们见更多一般是Node1 重启或者进程挂掉,从
我们曾经提过,ZenDiscovery用于处理一切的Node事件变更,那我们就找找这段代码
![](http://upload-images.jianshu.io/upload_images/6181457-f2125b70c80b24a2.png)
![](http://upload-images.jianshu.io/upload_images/6181457-0f284570b6360b9b.png)
继续跟下去,这次我们只关心和shard allocation 相关的代码,处理Node fail 的话会执行一个NodeRemovalClusterStateTaskExecutor.Task
在这里我们找到了我们想找到的东西
![](http://upload-images.jianshu.io/upload_images/6181457-665f69d7f455d192.png)
这里就会把这台机的所有shard进行 failShard()处理,也就是会mark UNassigned 等等。那么在下一次的
ClusterState
同步事件时其他所有节点就会知道了
![](http://upload-images.jianshu.io/upload_images/6181457-60709071f8f83ee8.png)
3. Cluster Reroute
有时候运维需要,比如我们这次需要全部升级ES,那么往往我们需要手工的去锁住禁止集群进行 allocation,rebalance 等,甚至我们在加机器之后我们希望是手工自己去分配分片,那么就要用到cluster Reroute。相对应的Action是RestClusterRerouteAction
和TransportClusterRerouteAction
,而最后会是由 allocationService.reroute()
来承载,在这个例子里,加入我手工吧0p从Node1 移动到Node3 去,看会发生什么事情。
![](http://upload-images.jianshu.io/upload_images/6181457-e7c2d81bf1eec0f9.png)
![](http://upload-images.jianshu.io/upload_images/6181457-e833a182e7f0c14d.png)
上面代码最重要就是那句
RoutingExplanations explanations = commands.execute(allocation, explain);
在MoveAllocationCommand
中会对将要Reroute的操作进行一系列的判断,比如canAllocate()
![](http://upload-images.jianshu.io/upload_images/6181457-850feb9587134678.png)
![](http://upload-images.jianshu.io/upload_images/6181457-1ce6335f31acc272.png)
仔细留意的话,这里的targetRelocatingShard 其实是一个
PeerRecoverySource
的shard,这是就会把这个shard标记成INITIALIZING
状态。
![](http://upload-images.jianshu.io/upload_images/6181457-cb5e98d1cf94bdcd.png)
那这个状态就会在ClusterState 中并下次心跳同步到目标节点去。也就是说Node3 在下次获取ClusterState时,就会得到新的ShardRouting 请求了。
这时Node3 还是回到 第1章中的方法里,只是这时Node3 并没有0r这个Shard,并且,Node3就会开始尝试从Node1 处去回复这个Shard了
![](http://upload-images.jianshu.io/upload_images/6181457-974a33e2641e5067.png)
后话
我写这篇文章的初衷是,我当时很好奇,Node3 是如何恢复0p的呢,因为0p好歹是个Primary 呀,并且这时,Node1 会继续服务吗?
继续服务这个是肯定的,至于Node3是如何恢复的,我的一个猜测是首先Node3会像恢复一个replica 那样把Node1的分片拷贝过去,接着,在这段时间的增量数据将会继续同步translog 的方式同步到Node3,到最后,直到同步到一个最新的点,这时Node1 和Node3的分片都是同步的,然后就直接做个切换,0p就成功切换到Node3去了,关于Recovery这块我没有再继续读下去了,知道的朋友欢迎也顺便告知我一下。
由于Allocation这部分代码非常复杂,因此我也没有完全弄懂,有些点也是我自己的推测而成,如有错误欢迎指正和讨论。