11、详解Controller选举实现原理

2020-07-26  本文已影响0人  技术灭霸

Controller 选举,是指 Kafka 选择集群中一台 Broker 行使 Controller 职责。整个选举过 程分为两个步骤:触发选举和开始选举。

触发选举

我先用一张图展示下可能触发 Controller 选举的三个场景。


  1. 集群从零启动时;
  2. Broker 侦测 /controller 节点消失时;
  3. Broker 侦测到 /controller 节点数据发生变更时。

场景一:集群从零启动

集群首次启动时,Controller 尚未被选举出来。于是,Broker 启动后,首先将 Startup 这 个 ControllerEvent 写入到事件队列中,然后启动对应的事件处理线程和 ControllerChangeHandler ZooKeeper 监听器,最后依赖事件处理线程进行 Controller 的选举。

在源码中,KafkaController 类的 startup 方法就是做这些事情的。当Broker 启动时,它 会调用这个方法启动 ControllerEventThread 线程。值得注意的是,每个 Broker 都需要 做这些事情,不是说只有 Controller 所在的 Broker 才需要执行这些逻辑。

startup 方法的主体代码如下:

  def startup() = {
    // 第1步:注册Zookeeper状态变更监听器,它是用于监听Zookeeper会话过期
    zkClient.registerStateChangeHandler(new StateChangeHandler {
      override val name: String = StateChangeHandlers.ControllerHandler
      override def afterInitializingSession(): Unit = {
        eventManager.put(RegisterBrokerAndReelect)
      }
      override def beforeInitializingSession(): Unit = {
        val queuedEvent = eventManager.clearAndPut(Expire)
        queuedEvent.awaitProcessing()
      }
    })
    // 第2步:写入Startup事件到事件队列
    eventManager.put(Startup)
    // 第3步:启动ControllerEventThread线程,开始处理事件队列中的ControllerEvent
    eventManager.start()
  }

首先,startup 方法会注册 ZooKeeper 状态变更监听器,用于监听 Broker 与 ZooKeeper 之间的会话是否过期。接着,写入 Startup 事件到事件队列,然后启动 ControllerEventThread 线程,开始处理事件队列中的 Startup 事件。

接下来,我们来学习下 KafkaController 的 process 方法处理 Startup 事件的方法:

  override def process(event: ControllerEvent): Unit = {
    try {
      event match {
        case Startup =>
          processStartup()
      }
  }

 private def processStartup(): Unit = {
    // 注册ControllerChangeHandler ZooKeeper监听器
 zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
// 执行Controller选举
    elect()
  }

这三种场景都要选举 Controller,因此,我们最后统一学习 elect 方法的代码实现。

总体来说,集群启动时,Broker 通过向事件队列“塞入”Startup 事件的方式,来触发 Controller 的竞选。

场景二:/controller 节点消失

Broker 检测到 /controller 节点消失时,就意味着,此时整个集群中没有 Controller。因此,所有检测到 /controller 节点消失的 Broker,都会立即调用 elect 方法执行竞选逻辑。

场景三:/controller 节点数据变更

Broker 检测到 /controller 节点数据发生变化,通常表明,Controller“易主”了,这就分为两种情况:

  1. 如果 Broker 之前是 Controller,那么该 Broker 需要首先执行卸任操作,然后再尝试竞选;
  2. 如果 Broker 之前不是 Controller,那么,该 Broker 直接去竞选新 Controller。

卸任逻辑是由 onControllerResignation 方法执行的,它主要是用于清空各种数据结构的值、取消 ZooKeeper 监听器、关闭各种状态机以及管理器,等等。我用注释的方式给出它的逻辑实现:

  private def onControllerResignation(): Unit = {
    debug("Resigning")
    // 取消ZooKeeper监听器的注册
    zkClient.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
    zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
    zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
    zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
    unregisterBrokerModificationsHandler(brokerModificationsHandlers.keySet)

    // 关闭Kafka线程调度器,其实就是取消定期的Leader重选举
    kafkaScheduler.shutdown()
    // 将统计字段全部清0
    offlinePartitionCount = 0
    preferredReplicaImbalanceCount = 0
    globalTopicCount = 0
    globalPartitionCount = 0
    topicsToDeleteCount = 0
    replicasToDeleteCount = 0
    ineligibleTopicsToDeleteCount = 0
    ineligibleReplicasToDeleteCount = 0

    // 关闭Token过期检查调度器
    if (tokenCleanScheduler.isStarted)
      tokenCleanScheduler.shutdown()

    // 取消分区重分配监听器的注册
    unregisterPartitionReassignmentIsrChangeHandlers()
    // 关闭分区状态机
    partitionStateMachine.shutdown()
    // 取消主题变更监听器的注册
    zkClient.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
    // 取消分区变更监听器的注册
    unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq)
    // 取消主题删除监听器的注册
    zkClient.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
    // 关闭副本状态机
    replicaStateMachine.shutdown()
    // 取消Broker变更监听器的注册
    zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
    // 关闭Controller通道管理器
    controllerChannelManager.shutdown()
    // 清空集群元数据
    controllerContext.resetContext()
    info("Resigned")
  }

选举 Controller

这三种选举场景最后都会调用 elect 方法来执行选举逻辑。我们来看下它的实现:

private def elect(): Unit = {
    // 第1步:获取当前Controller所在Broker的序列号,如果Controller不存在,显式标记为-1
    activeControllerId = zkClient.getControllerId.getOrElse(-1)
    // 第2步:如果当前Controller已经选出来了,直接返回即可
    if (activeControllerId != -1) {
      debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
      return
    }

    try {
      // 第2步:注册Controller相关信息
      val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
      // 主要是创建/controller节点
      controllerContext.epoch = epoch
      controllerContext.epochZkVersion = epochZkVersion
      activeControllerId = config.brokerId

      info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +
        s"and epoch zk version is now ${controllerContext.epochZkVersion}")
      // 第4步:执行当选Controller的后续逻辑
      onControllerFailover()
    } catch {
      case e: ControllerMovedException =>
        maybeResign()

        if (activeControllerId != -1)
          debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e)
        else
          warn("A controller has been elected but just resigned, this will result in another round of election", e)

      case t: Throwable =>
        error(s"Error while electing or becoming controller on broker ${config.brokerId}. " +
          s"Trigger controller movement immediately", t)
        triggerControllerMove()
    }
  }

该方法首先检查 Controller 是否已经选出来了。要知道,集群中的所有 Broker 都要执行 这些逻辑,因此,非常有可能出现某些 Broker 在执行 elect 方法时,Controller 已经被选 出来的情况。如果 Controller 已经选出来了,那么,自然也就不用再做什么了。相反地, 如果 Controller 尚未被选举出来,那么,代码会尝试创建 /controller 节点去抢注 Controller。

一旦抢注成功,就调用 onControllerFailover 方法,执行选举成功后的动作。这些动作包 括注册各类 ZooKeeper 监听器、删除日志路径变更和 ISR 副本变更通知事件、启动 Controller 通道管理器,以及启动副本状态机和分区状态机。

如果抢注失败了,代码会抛出 ControllerMovedException 异常。这通常表明 Controller 已经被其他 Broker 抢先占据了,那么,此时代码调用 maybeResign 方法去执行卸任逻 辑。

总结

1、Controller 依赖 ZooKeeper 实现 Controller 选举,主要是借助于 /controller 临时节 点和 ZooKeeper 的监听器机制。
2、Controller 触发场景有 3 种:集群启动时;/controller 节点被删除时;/controller 节 点数据变更时。
3、源码最终调用 elect 方法实现 Controller 选举。

上一篇下一篇

猜你喜欢

热点阅读