kafka

Kafka-server源码分析之ZK和controller

2021-01-23  本文已影响0人  tracy_668

[TOC]

Zookeeper与集群管理原理

我们知道所有的broker启动之后,都会连接到Zookeeper上面。那具体来讲,Zookeeper要帮助Kafka完成什么工作呢?

集群管理的思路

broker的“生“与“死“

任何时候,当集群中有1个新的broker加入,或者某个旧的broker死亡,集群中其它机器都需要知道这件事。

其实现方式就是监听Zookeeper上面的/broker/ids结点,其每个子结点就对应1台broker机器,当broker机器添加,子结点列表增大;broker机器死亡,子结点列表减小。

Controller
为了减小Zookeeper的压力,同时也降低整个分布式系统的复杂度,Kafka引入了一个“中央控制器“,也就是Controller。

其基本思路是:先通过Zookeeper在所有broker中选举出一个Controller,然后用这个Controller来控制其它所有的broker,而不是让zookeeper直接控制所有的机器。

比如上面对/broker/ids的监听,并不是所有broker都监听此结点,而是只有Controller监听此结点,这样就把一个“分布式“问题转化成了“集中式“问题,即降低了Zookeeper负担,也便于控制逻辑的编写。

topic与partition的增加/删除
同样,作为1个分布式集群,当增加/删除一个topic或者partition的时候,不可能挨个通知集群的每1台机器。

这里的实现思路也是:管理端(Admin/TopicCommand)把增加/删除命令发送给Zk,Controller监听Zk获取更新消息, Controller再分发给相关的broker。

I0ITec ZkClient
关于Zookeeper的客户端,我们知道常用的有Apache Curator,但Kafka用的不是这个。而是另外一个叫做I0ITec ZkClient的,记得没错的话,阿里的dubbo框架,也用的这个。相对Curator,它更加轻量级。

具体来说,其主要有3个Listener:

//当某个session断了重连,就会调用这个监听器
public interface IZkStateListener {
    public void handleStateChanged(KeeperState state) throws Exception;

    public void handleNewSession() throws Exception;

    public void handleSessionEstablishmentError(final Throwable error) throws Exception;

}

//当某个结点的data变化之后(data变化,或者结点本事被删除)
public interface IZkDataListener {

    public void handleDataChange(String dataPath, Object data) throws Exception;

    public void handleDataDeleted(String dataPath) throws Exception;
}

//当某个结点的子结点发生变化
public interface IZkChildListener {
    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception;
}

Kafka正是利用上面3个listener实现了所有zookeeper相关状态变化的监听,其具体应用,将在后续序列逐个展开!

KafkaController选举过程/Failover与Resignation

Kafka集群的几大核心组件

在正式进入源码分析之前,我们先看一下整个Kafka集群的几大核心组件。让我们从整个服务器的main函数开始:

//Kafka
  def main(args: Array[String]): Unit = {
    try {
      val serverProps = getPropsFromArgs(args)
      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

      // attach shutdown handler to catch control-c
      Runtime.getRuntime().addShutdownHook(new Thread() {
        override def run() = {
          kafkaServerStartable.shutdown   //注册一个JVM关闭的钩子
        }
      })

      kafkaServerStartable.startup  //启动程序
      kafkaServerStartable.awaitShutdown
    }
    catch {
      case e: Throwable =>
        fatal(e)
        System.exit(1)
    }
    System.exit(0)
  }

//KafkaServerStartable
class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
  private val server = new KafkaServer(serverConfig)

  def startup() {
    try {
      server.startup()
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
        // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
        System.exit(1)
    }
  }
  ...
}

//KafkaServer
  def startup() {
    try {
      info("starting")

      if(isShuttingDown.get)
        throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

      if(startupComplete.get)
        return

      val canStartup = isStartingUp.compareAndSet(false, true)
      if (canStartup) {
        metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)

        brokerState.newState(Starting)

        //核心组件0
        kafkaScheduler.startup()

        zkUtils = initZk()

        logManager = createLogManager(zkUtils.zkClient, brokerState)
        logManager.startup()

        config.brokerId =  getBrokerId
        this.logIdent = "[Kafka Server " + config.brokerId + "], "
        //核心组件1
        socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
        socketServer.startup()

        //核心组件2
        replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
          isShuttingDown)
        replicaManager.startup()

        //核心组件3
        kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
        kafkaController.startup()

        //核心组件4
        consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager)
        consumerCoordinator.startup()

        /* Get the authorizer and initialize it if one is specified.*/
        authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
          val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
          authZ.configure(config.originals())
          authZ
        }

        //核心组件5
        apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
        brokerState.newState(RunningAsBroker)

        Mx4jLoader.maybeLoad()

        /* start dynamic config manager */
        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager),
                                                           ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))

        // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
        // TODO: Move this logic to DynamicConfigManager
        AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
          case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
        }

        // Create the config manager. start listening to notifications
        dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
        dynamicConfigManager.startup()

        /* tell everyone we are alive */
        val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
          if (endpoint.port == 0)
            (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
          else
            (protocol, endpoint)
        }
        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils)
        kafkaHealthcheck.startup()

        /* register broker metrics */
        registerStats()

        shutdownLatch = new CountDownLatch(1)
        startupComplete.set(true)
        isStartingUp.set(false)
        AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
        info("started")
      }
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
        isStartingUp.set(false)
        shutdown()
        throw e
    }
  }

除了代码,我们也看一下服务器的启动/关闭的shell脚本:

bin/kafka-server-start.sh
通过 nohup 守护进程,具体脚本细节就不在此列出了。

//bin/kafak-server-stop.sh 可以看到,进程的关闭很简单,就是通过kill命令,发送SIGTERM信号。JVM收到信号,执行上面的钩子函数
ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM   

通过看Server的启动函数,我们可以看到有以下几大核心组件:

1。SocketServer + KafkaApis 前者接收所有网络请求,后者处理请求
2。KafkaController 负责Controller选举
3。ConsumerCoordinator 用于consumer group的负载均衡
4。ReplicaManager 机器的管理
5。KafkaSchedule

这里着重分析KafkaController,其它核心组件,后面会一一讲述。

选举的基本原理

整个选举过程是通过zk上的一个临时节点来实现的:/controller节点,其data结构为:核心信息就是记录当前的controller的brokerId。

"version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp

当controller挂了,其它所有broker监听到此临时节点消失,然后争相创建此临时节点,谁创建成功,谁就成为新的Controller。

除了/controller节点,还有一个辅助的/controller_epoch,记录当前Controller的轮值数。

KafkaController与ZookeeperLeaderElector

整个选举过程是通过这2个核心类实现的,其中ZookeeperLeaderElector是KafkaController的一个成员变量:

//KafkaController的一个成员变量
  private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
    onControllerResignation, config.brokerId)

下图展示了选举的整个交互过程:
(1)KafkaController和ZookeeperLeaderElector内部各有1个Listener,一个监听session重连,1个监听/controller节点变化。
(2)当session重连,或者/controller节点被删除,则调用elect()函数,发起重新选举。在重新选举之前,先判断自己是否旧的Controller,如果是,则先调用onResignation退位。


image.png

下面从KakfaController的startup函数看起:

  def startup() = {
    inLock(controllerContext.controllerLock) {
      info("Controller starting up")
      registerSessionExpirationListener()   //第1种监听:SessionExpirationListener
      isRunning = true
      controllerElector.startup   //第2种监听:LeaderChangeListener
      info("Controller startup complete")
    }
  }

  class SessionExpirationListener() extends IZkStateListener with Logging {
    ...
    @throws(classOf[Exception])
    def handleNewSession() {
      info("ZK expired; shut down all controller components and try to re-elect")
      inLock(controllerContext.controllerLock) {
        onControllerResignation()   //先退位
        controllerElector.elect   //发起重新选举
      }
    }
    ...
  }
  
  class LeaderChangeListener extends IZkDataListener with Logging {

    @throws(classOf[Exception])
    def handleDataChange(dataPath: String, data: Object) {
      inLock(controllerContext.controllerLock) {
        val amILeaderBeforeDataChange = amILeader
        leaderId = KafkaController.parseControllerId(data.toString)

        if (amILeaderBeforeDataChange && !amILeader)
          onResigningAsLeader()  //自己以前是controller,现在不是,退位
      }
    }


    @throws(classOf[Exception])
    def handleDataDeleted(dataPath: String) {
      inLock(controllerContext.controllerLock) {
        debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
          .format(brokerId, dataPath))
        if(amILeader)
          onResigningAsLeader()  //关键点:controller死了,有可能不是因为自己死了。而是和zookeeper的session断了。但是自己还在。此时,自己先退休,再重新发起选举。
        elect   //发起重现选举
      }
    }
  }

2个关键回调:Failover(上任)与Resignation(退位)
在上面的选举过程中,存在2个关键的callback:也就是新Controller上任要做的事情和旧Controller退位要做的事情。

“上任“这个比较容易理解,也就是新的broker选举为controller;那为什么会有“退位“呢?

这是因为zk是用心跳来判断controller是否存活,可能controller存活,但zk认为它挂了,这个时候选举出了新的controller。那旧的controller发现自己是旧的,就得主动退位。

下面看一下“新官上任“和“旧官退位“时,分别做了什么:

  def onControllerFailover() {
    if(isRunning) {
      readControllerEpochFromZookeeper()
      //递增controller epoch
      incrementControllerEpoch(zkUtils.zkClient) 
      
      //关键点:接管所有对broker/partition节点的监听
      registerReassignedPartitionsListener()
      registerIsrChangeNotificationListener()
      registerPreferredReplicaElectionListener()
      partitionStateMachine.registerListeners()
      replicaStateMachine.registerListeners()
      initializeControllerContext()
      replicaStateMachine.startup()
      partitionStateMachine.startup()
     
      // register the partition change listeners for all existing topics on failover
      controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
      info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
      brokerState.newState(RunningAsController)
      maybeTriggerPartitionReassignment()
      maybeTriggerPreferredReplicaElection()
      /* send partition leadership info to all live brokers */
      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
      if (config.autoLeaderRebalanceEnable) {
        info("starting the partition rebalance scheduler")
        autoRebalanceScheduler.startup()
        autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
          5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
      }
      deleteTopicManager.start()
    }
    else
      info("Controller has been shut down, aborting startup/failover")
  }

  def onControllerResignation() {
    //关键点:放弃对所有broker/partition的监听
    deregisterIsrChangeNotificationListener()
    deregisterReassignedPartitionsListener()
    deregisterPreferredReplicaElectionListener()

    // shutdown delete topic manager
    if (deleteTopicManager != null)
      deleteTopicManager.shutdown()

    // shutdown leader rebalance scheduler
    if (config.autoLeaderRebalanceEnable)
      autoRebalanceScheduler.shutdown()

    inLock(controllerContext.controllerLock) {
      // de-register partition ISR listener for on-going partition reassignment task
      deregisterReassignedPartitionsIsrChangeListeners()
      // shutdown partition state machine
      partitionStateMachine.shutdown()
      // shutdown replica state machine
      replicaStateMachine.shutdown()
      // shutdown controller channel manager
      if(controllerContext.controllerChannelManager != null) {
        controllerContext.controllerChannelManager.shutdown()
        controllerContext.controllerChannelManager = null
      }
      // reset controller context
      controllerContext.epoch=0
      controllerContext.epochZkVersion=0
      brokerState.newState(RunningAsBroker)

    }
  }

KafkaController成为leader的过程

KafkaController内部有一个ZookeeperLeaderElector,用来通过zk选举自己是否是leader

class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
……
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
  onControllerResignation, config.brokerId)

/**
 * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
 * is the controller. It merely registers the session expiration listener and starts the controller leader
 * elector
 */
def startup() = {
  inLock(controllerContext.controllerLock) {
    info("Controller starting up");
    registerSessionExpirationListener()//注册一个会话超时的listener
    isRunning = true
    controllerElector.startup//启动controllerElector
    info("Controller startup complete")
  }
}
}

其zk选举的路径为/controller/*,并且对zk集群建立一个会话超时的listener

class SessionExpirationListener() extends IZkStateListener with Logging {
  this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
  @throws(classOf[Exception])
  def handleStateChanged(state: KeeperState) {
    // do nothing, since zkclient will do reconnect for us.
  }
  /**
   * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
   * any ephemeral nodes here.
   *
   * @throws Exception
   *             On any error.
   */
  @throws(classOf[Exception])
  def handleNewSession() {
    info("ZK expired; shut down all controller components and try to re-elect")
    inLock(controllerContext.controllerLock) {
      onControllerResignation()//当会话超时,重新连接上的时候,调用之前注册在ZookeeperLeaderElector的onControllerResignation函数
      controllerElector.elect//重新选举
    }
  }
}

因此重点关注ZookeeperLeaderElector内部的逻辑:

class ZookeeperLeaderElector(controllerContext: ControllerContext,
                             electionPath: String,
                             onBecomingLeader: () => Unit,
                             onResigningAsLeader: () => Unit,
                             brokerId: Int)
  extends LeaderElector with Logging {
  var leaderId = -1
  // create the election path in ZK, if one does not exist
  val index = electionPath.lastIndexOf("/")
  if (index > 0)
    makeSurePersistentPathExists(controllerContext.zkClient, electionPath.substring(0, index))
  val leaderChangeListener = new LeaderChangeListener

  def startup {
    inLock(controllerContext.controllerLock) {//其选举路径为/controller/*
      controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
      elect//触发选举
    }
  }

  private def getControllerID(): Int = {
    readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
       case Some(controller) => KafkaController.parseControllerId(controller)
       case None => -1
    }
  }
    
  def elect: Boolean = {
    val timestamp = SystemTime.milliseconds.toString
    val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
   
   leaderId = getControllerID 
    /* 
     * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, 
     * it's possible that the controller has already been elected when we get here. This check will prevent the following 
     * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
     */
    if(leaderId != -1) {
       debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
       return amILeader
    }

    try {//通过zk创建Ephemeral Node的方式来进行选举,即如果存在并发情况下向zk的同一个路径创建node的话,有且只有1个客户端会创建成功,其它客户端创建失败,但是当创建成功的客户端和zk的链接断开之后,这个node也会消失,其它的客户端从而继续竞争
      createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId,
        (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
        controllerContext.zkSessionTimeout)
      info(brokerId + " successfully elected as leader")
      leaderId = brokerId
      onBecomingLeader()//如果成功,则自己成为leader
    } catch {
      case e: ZkNodeExistsException =>
        // If someone else has written the path, then
        leaderId = getControllerID 

        if (leaderId != -1)
          debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
        else
          warn("A leader has been elected but just resigned, this will result in another round of election")

      case e2: Throwable =>
        error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
        resign()//发生异常,删除路径
    }
    amILeader
  }

  def close = {
    leaderId = -1
  }

  def amILeader : Boolean = leaderId == brokerId

  def resign() = {
    leaderId = -1
    deletePath(controllerContext.zkClient, electionPath)
  }

  /**
   * We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will
   * have its own session expiration listener and handler
   */
  class LeaderChangeListener extends IZkDataListener with Logging {
    /**
     * Called when the leader information stored in zookeeper has changed. Record the new leader in memory
     * @throws Exception On any error.
     */
    @throws(classOf[Exception])
    def handleDataChange(dataPath: String, data: Object) {
      inLock(controllerContext.controllerLock) {
        leaderId = KafkaController.parseControllerId(data.toString)
        info("New leader is %d".format(leaderId))
      }
    }

    /**
     * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
     * @throws Exception
     *             On any error.
     */
    @throws(classOf[Exception])
    def handleDataDeleted(dataPath: String) {//KafkaController在第一次启动的时候没有选举成功,然后当其发现节点已经消失的时候,会重新触发选举
      inLock(controllerContext.controllerLock) {
        debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
          .format(brokerId, dataPath))
        if(amILeader)//可能之前自己的角色是leader,则重新选举未必成为leader,则需要清除之前所有缓存的内容
          onResigningAsLeader()
        elect//触发选举
      }
    }
  }
}

因此KafkaController成为leader分2种情况:

  1. 第一次启动的时候会主动触发elect,如果被选举成为leader,则做leader该做的事情
  2. 第一次启动的时候选举失败,则通过LeaderChangeListener监控/controller/*路径,发现下面数据被删除的时候,触发handleDataDeleted,从而再次触发选举

12.2 kafkaController的初始化(leader)

从上节可以看到,KafkaController选举成功则调用onBecomingLeader,当之前的leader再次触发选举的时候调用onResigningAsLeader,以上2个函数分别对应:onControllerFailover和onControllerResignation。

onControllerResignation很简单,就是把里面所有的模块shutdown或者注销掉:

def onControllerResignation() {
  // de-register listeners
  deregisterReassignedPartitionsListener()
  deregisterPreferredReplicaElectionListener()
  // shutdown delete topic manager
  if (deleteTopicManager != null)
    deleteTopicManager.shutdown()
  // shutdown leader rebalance scheduler
  if (config.autoLeaderRebalanceEnable)
    autoRebalanceScheduler.shutdown()
  inLock(controllerContext.controllerLock) {
    // de-register partition ISR listener for on-going partition reassignment task
    deregisterReassignedPartitionsIsrChangeListeners()
    // shutdown partition state machine
    partitionStateMachine.shutdown()
    // shutdown replica state machine
    replicaStateMachine.shutdown()
    // shutdown controller channel manager
    if(controllerContext.controllerChannelManager != null) {
      controllerContext.controllerChannelManager.shutdown()
      controllerContext.controllerChannelManager = null
    }
    // reset controller context
    controllerContext.epoch=0
    controllerContext.epochZkVersion=0
    brokerState.newState(RunningAsBroker)
  }
}

以上各种模块会在onControllerFailover介绍,onControllerFailover本质上就是开启里面所有的功能。

onControllerFailover的逻辑如下:

 def onControllerFailover() {
    if(isRunning) {
      info("Broker %d starting become controller state transition".format(config.brokerId))
      readControllerEpochFromZookeeper()
//记录选举的时钟,每成功选举一次,递增1
      incrementControllerEpoch(zkClient)
/*leader初始化,具体内容见评注*/
      registerReassignedPartitionsListener()
      registerPreferredReplicaElectionListener()
      partitionStateMachine.registerListeners()
      replicaStateMachine.registerListeners()
      initializeControllerContext()
      replicaStateMachine.startup()
      partitionStateMachine.startup()
      controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
      info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
      brokerState.newState(RunningAsController)
      maybeTriggerPartitionReassignment()
      maybeTriggerPreferredReplicaElection()
      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
      if (config.autoLeaderRebalanceEnable) {
        info("starting the partition rebalance scheduler")
        autoRebalanceScheduler.startup()
        autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
          5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
      }
      deleteTopicManager.start()
    }
    else
      info("Controller has been shut down, aborting startup/failover")
  }

其中步骤如下:

1) 在/admin/reassign_partitions目录注册partitionReassignedListener监听函数

2) 在/admin/preferred_replica_election目录注册preferredReplicaElectionListener监听函数

3) 在/brokers/topics目录注册topicChangeListener监听函数

4) 在/admin/delete_topics目录注册deleteTopicsListener监听函数
5) 在/brokers/ids目录注册brokerChangeListener监听函数

6) 初始化ControllerContext上下文,里面包含了topic的各种元数据信息,除此之外ControllerContext内部的ControllerChannelManager负责和kafka集群内部的其它KafkaServer建立channel来进行通信,TopicDeletionManager

负责删除topic
7)通过replicaStateMachine初始化所有的replica状态
8)通过partitionStateMachine初始化所有的partition状态

  1. 在brokers/topics/***(具体的topic名字)/目录下注册AddPartitionsListener函数
  2. 通过处理之前启动留下的partition重分配的情况
  3. 处理之前启动留下的replica重新选举的情况
    12)向其它KafkaServer发送集群topic的元数据信息已进行数据的同步更新
    13)根据配置是否开启自动均衡
    14)开始删除topic

KafkaControl主要通过以上各种监听函数来完成kafka集群元数据的管理,接下来先详细描述PartitionStateMachine和ReplicaStateMachine原理,因为kafka topic 的partition状态和内容主要是通过以上2个管理类来实现的,然后按照上面的流程描述不同的listener的作用。

上一篇 下一篇

猜你喜欢

热点阅读