Elasticsearch 5.x 源码分析(10)从Clust
应网友要求,周末看了一下ClusterService
这个类,梳理了一下;看看ES是如何响应和处理一个clusterStateChange
的。
什么是ClusterState
在我看来,ClusterState就是整个ES进程的所有状态的逻辑封装,里面包含了各式各样的功能的Settings和Configs,每个nodes最终都需要维护一个一致的ClusterState属性。在5.0 以后,为了节省网络带宽,ES允许相邻的两个版本可以只发送 diff 数据,而不需要每次都发送整个全量的clusterState。
ClusterService
的成员变量不多,重点关注几个它和外部沟通的变量:
这里有三类比较重要的变量
-
clusterStatePublisher
是用来把本机产生的clusterStateChange
事件发布到所有nodes 用 -
stateAppliers
当产生一个新event时,这些消费者会消费这些changeEvent
-
clusterStateListener
当完成一个state 的更新后,触发listener通知其改变了
另外还有三个同名的方法,如下图,用于给外部调用提交一个更新事件。
ClusterService::SubmitStateUpdateTask()
调用链从这里开始,如果ES希望去发布一个changeEvent
那么就需要调用这三个同名方法之一;那么谁会去触发一个changeEvent
呢?那就多着了,比如Mapping 改变了,shard挂了,有新节点加进来,and so on; 有兴趣的点击调用链自己去看一下,这里不再赘述。
在
submitStateUpdateTask()
方法里,会根据入参的config
、executor
、task
等,根据优先级等因素最后用一个UpdateTask
类来封装并存入队列。UpdateTask
主要起的就是一个调度作用。最后还是回调了ClusterService
的runTasks()
方法。
ClusterService::runTasks()
runTasks 做了一些基本校验之后,就进入方法publishAndAppliyChanges()
方法,这是一个非常核心的方法,不过做的事情是比较简单的:
- 根据最新状态重新确定和节点的连接
- 如果其本身就是一个Master节点,那么就需要把这个
changeState
推送到所有节点去,其中的ClusterStatePublisher
下面会介绍
-
更新集群配置
- 通知所有的appliers 去应用这个变更,
ClusterStateAppliers
也会稍后介绍
-
应用变更
-
通知listeners 变更
ClusterService ::clusterStatePublisher
ClusterService
本身是没有集群其他节点信息的,因此如果它是一个Master,并且它要把一个stateEvent
发布出去,唯有通过ZenDiscovery
,那么很简单,它只需要把ZenDiscovery:publish()
挂进来就可以了,初始化代码在Node.java
的初始化部分
Publish()
的一句核心语句就是
PublishClusterStateAction
publishClusterState
是一个PublishClusterStateAction
,顾名思义它主要负责和各个节点间的clusterChangeEvent
的发送和接受任务;
首先留意一下在什么情况下发送的是diffState,什么情况下发送fullState到其他节点(和ES版本有关),相对应的两个方法是sendClusterStateDiff()
和sendClusterStateFull()
在PublishClusterStateAction
里最总要的两对方法就是
-
sendClusterStateToNode()
和handleIncomingClusterStateRequest()
-
sendCommitToNode()
和
handleIncomingClusterStateRequest()
那这个是什么意思呢?还记得吗,ES的Master要向所有node发送一个状态变更的时候,需要有两个过程
- 调用
sendClusterStateToNode()
向所有nodes发送这个状态变更的通知,其他节点接收到的话调用handleIncomingClusterStateRequest()
来处理这个事件,这时它仅仅把这个事件存放在queue里而不是立刻应用这个状态变更,因为它需要等待Master的命令,况且,这个事件的前面说不准还有一堆的状态变更还没有响应的,所以其实它什么也没做,回复一个确认 - Master会调用
sendCommitToNode()
到所有的nodes ,当然,之前它必须得到过半数的确认说收到这个变更才行,那么所有收到这个请求的nodes就会调用handleIncomingClusterStateRequest()
去消费这个变更,注意在此之前这些nodes需要消费完queue之前的所有变更才行。而Master会启动一个timer来等待Response。
仔细阅读上面4个方法的代码,就是做上面两件事情的,注意代码里仍然有对diff 和full State的判断。
ClusterService::clusterStateAppliers 和ClusterService::clusterStateListeners
其实这两个都是一种listener,需要感知各种状态变更的modules
都需要往这里来注册自己的appliers或者listeners 来消费这些状态变更,至于他们之间的差别,主要就是前者是在应用之前需要做些什么,后者是应用了变更之后需要做些什么,有兴趣的可以点进去这些调用栈去看什么modules 注册了这些事件。
至此整个ClusterService 处理ClusterState 的调用链就基本走了一遍了,回顾一下我画了一张图来帮助记忆