11、详解Controller选举实现原理
Controller 选举,是指 Kafka 选择集群中一台 Broker 行使 Controller 职责。整个选举过 程分为两个步骤:触发选举和开始选举。
触发选举
我先用一张图展示下可能触发 Controller 选举的三个场景。
- 集群从零启动时;
- Broker 侦测 /controller 节点消失时;
- Broker 侦测到 /controller 节点数据发生变更时。
场景一:集群从零启动
集群首次启动时,Controller 尚未被选举出来。于是,Broker 启动后,首先将 Startup 这 个 ControllerEvent 写入到事件队列中,然后启动对应的事件处理线程和 ControllerChangeHandler ZooKeeper 监听器,最后依赖事件处理线程进行 Controller 的选举。
在源码中,KafkaController 类的 startup 方法就是做这些事情的。当Broker 启动时,它 会调用这个方法启动 ControllerEventThread 线程。值得注意的是,每个 Broker 都需要 做这些事情,不是说只有 Controller 所在的 Broker 才需要执行这些逻辑。
startup 方法的主体代码如下:
def startup() = {
// 第1步:注册Zookeeper状态变更监听器,它是用于监听Zookeeper会话过期
zkClient.registerStateChangeHandler(new StateChangeHandler {
override val name: String = StateChangeHandlers.ControllerHandler
override def afterInitializingSession(): Unit = {
eventManager.put(RegisterBrokerAndReelect)
}
override def beforeInitializingSession(): Unit = {
val queuedEvent = eventManager.clearAndPut(Expire)
queuedEvent.awaitProcessing()
}
})
// 第2步:写入Startup事件到事件队列
eventManager.put(Startup)
// 第3步:启动ControllerEventThread线程,开始处理事件队列中的ControllerEvent
eventManager.start()
}
首先,startup 方法会注册 ZooKeeper 状态变更监听器,用于监听 Broker 与 ZooKeeper 之间的会话是否过期。接着,写入 Startup 事件到事件队列,然后启动 ControllerEventThread 线程,开始处理事件队列中的 Startup 事件。
接下来,我们来学习下 KafkaController 的 process 方法处理 Startup 事件的方法:
override def process(event: ControllerEvent): Unit = {
try {
event match {
case Startup =>
processStartup()
}
}
private def processStartup(): Unit = {
// 注册ControllerChangeHandler ZooKeeper监听器
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
// 执行Controller选举
elect()
}
这三种场景都要选举 Controller,因此,我们最后统一学习 elect 方法的代码实现。
总体来说,集群启动时,Broker 通过向事件队列“塞入”Startup 事件的方式,来触发 Controller 的竞选。
场景二:/controller 节点消失
Broker 检测到 /controller 节点消失时,就意味着,此时整个集群中没有 Controller。因此,所有检测到 /controller 节点消失的 Broker,都会立即调用 elect 方法执行竞选逻辑。
场景三:/controller 节点数据变更
Broker 检测到 /controller 节点数据发生变化,通常表明,Controller“易主”了,这就分为两种情况:
- 如果 Broker 之前是 Controller,那么该 Broker 需要首先执行卸任操作,然后再尝试竞选;
- 如果 Broker 之前不是 Controller,那么,该 Broker 直接去竞选新 Controller。
卸任逻辑是由 onControllerResignation 方法执行的,它主要是用于清空各种数据结构的值、取消 ZooKeeper 监听器、关闭各种状态机以及管理器,等等。我用注释的方式给出它的逻辑实现:
private def onControllerResignation(): Unit = {
debug("Resigning")
// 取消ZooKeeper监听器的注册
zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
unregisterBrokerModificationsHandler(brokerModificationsHandlers.keySet)
// 关闭Kafka线程调度器,其实就是取消定期的Leader重选举
kafkaScheduler.shutdown()
// 将统计字段全部清0
offlinePartitionCount = 0
preferredReplicaImbalanceCount = 0
globalTopicCount = 0
globalPartitionCount = 0
topicsToDeleteCount = 0
replicasToDeleteCount = 0
ineligibleTopicsToDeleteCount = 0
ineligibleReplicasToDeleteCount = 0
// 关闭Token过期检查调度器
if (tokenCleanScheduler.isStarted)
tokenCleanScheduler.shutdown()
// 取消分区重分配监听器的注册
unregisterPartitionReassignmentIsrChangeHandlers()
// 关闭分区状态机
partitionStateMachine.shutdown()
// 取消主题变更监听器的注册
zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
// 取消分区变更监听器的注册
unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq)
// 取消主题删除监听器的注册
zkClient.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
// 关闭副本状态机
replicaStateMachine.shutdown()
// 取消Broker变更监听器的注册
zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
// 关闭Controller通道管理器
controllerChannelManager.shutdown()
// 清空集群元数据
controllerContext.resetContext()
info("Resigned")
}
选举 Controller
这三种选举场景最后都会调用 elect 方法来执行选举逻辑。我们来看下它的实现:
private def elect(): Unit = {
// 第1步:获取当前Controller所在Broker的序列号,如果Controller不存在,显式标记为-1
activeControllerId = zkClient.getControllerId.getOrElse(-1)
// 第2步:如果当前Controller已经选出来了,直接返回即可
if (activeControllerId != -1) {
debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
return
}
try {
// 第2步:注册Controller相关信息
val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
// 主要是创建/controller节点
controllerContext.epoch = epoch
controllerContext.epochZkVersion = epochZkVersion
activeControllerId = config.brokerId
info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +
s"and epoch zk version is now ${controllerContext.epochZkVersion}")
// 第4步:执行当选Controller的后续逻辑
onControllerFailover()
} catch {
case e: ControllerMovedException =>
maybeResign()
if (activeControllerId != -1)
debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
else
warn("A controller has been elected but just resigned, this will result in another round of election", e)
case t: Throwable =>
error(s"Error while electing or becoming controller on broker ${config.brokerId}. " +
s"Trigger controller movement immediately", t)
triggerControllerMove()
}
}
该方法首先检查 Controller 是否已经选出来了。要知道,集群中的所有 Broker 都要执行 这些逻辑,因此,非常有可能出现某些 Broker 在执行 elect 方法时,Controller 已经被选 出来的情况。如果 Controller 已经选出来了,那么,自然也就不用再做什么了。相反地, 如果 Controller 尚未被选举出来,那么,代码会尝试创建 /controller 节点去抢注 Controller。
一旦抢注成功,就调用 onControllerFailover 方法,执行选举成功后的动作。这些动作包 括注册各类 ZooKeeper 监听器、删除日志路径变更和 ISR 副本变更通知事件、启动 Controller 通道管理器,以及启动副本状态机和分区状态机。
如果抢注失败了,代码会抛出 ControllerMovedException 异常。这通常表明 Controller 已经被其他 Broker 抢先占据了,那么,此时代码调用 maybeResign 方法去执行卸任逻 辑。
总结
1、Controller 依赖 ZooKeeper 实现 Controller 选举,主要是借助于 /controller 临时节 点和 ZooKeeper 的监听器机制。
2、Controller 触发场景有 3 种:集群启动时;/controller 节点被删除时;/controller 节 点数据变更时。
3、源码最终调用 elect 方法实现 Controller 选举。