玩转大数据程序员今日看点

Kafka集群建立过程分析

2017-02-03  本文已影响861人  扫帚的影子

第一个broker(我们叫它B1)启动

a. 更新zk上的controller epoch信息;
b. 注册zk上的broker/topic节点变化事件通知;
c. 初始化ControllerContext, 主要是从zk上获取broker, topic, parition, isr, partition leader, replicas等信息;
d. 启动ReplicaStateMachine;
e. 启动PartitionStateMachine;
f. 发送所有的partition信息(leader, isr, replica, epoch等)到所有的 live brokers;
g. 如果允许自动leader rebalance的话, 则启动AutoRebalanceScheduler;

def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
      inLock(controllerContext.controllerLock) {
        if (hasStarted.get) {
          ControllerStats.leaderElectionTimer.time {
            try {
              val curBrokerIds = currentBrokerList.map(_.toInt).toSet
              val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
              val newBrokerInfo = newBrokerIds.map(zkUtils.getBrokerInfo(_))
              val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
              val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
              controllerContext.liveBrokers = curBrokerIds.map(zkUtils.getBrokerInfo(_)).filter(_.isDefined).map(_.get)
              newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
              deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
              if(newBrokerIds.size > 0)
                controller.onBrokerStartup(newBrokerIds.toSeq)
              if(deadBrokerIds.size > 0)
                controller.onBrokerFailure(deadBrokerIds.toSeq)
            } catch {
              case e: Throwable => error("Error while handling broker changes", e)
            }
          }
        }
      }
    }
  }

干三件事:

  1. 更新ControllerContext.liveBrokers;
  2. 获取新增的broker列表, 回调KafkaController.onBrokerStartup(newBrokerIds.toSeq);
  3. 获取死掉的broker列表, 回调KafkaController.onBrokerFailure(deadBrokerIds.toSeq);

创建Topic

1553745402.jpg
           val currentChildren = {
              import JavaConversions._
              (children: Buffer[String]).toSet
            }
            val newTopics = currentChildren -- controllerContext.allTopics
            val deletedTopics = controllerContext.allTopics -- currentChildren
            controllerContext.allTopics = currentChildren

            val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
            controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
              !deletedTopics.contains(p._1.topic))
            controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
            if(newTopics.size > 0)
              controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)

干三件事

  1. 更新ControllerContext.allTopics;
  2. 更新ControllerContext.partitionReplicaAssignment;
  3. 回调KafkaController.onNewTopicCreation;
    partitionStateMachine.registerPartitionChangeListener(topic)
    onNewPartitionCreation(newPartitions)

onNewPartitionCreation的处理逻辑:

    partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
    partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
  1. 针对每个新topic注册PartitionChangeListener, 监控其partition数量的改变;
  2. partitionStateMachine.handleStateChanges(newPartitions, NewPartition): 将partition状态变为NewPartition;
  3. replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica):将replica状态变为NewReplica, 由于目前partition并没有进行选主操作,因此无其他操作被触发;
  4. partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector):
    4.1 将partition状态由NewPartition -> OnlinePartition;
    4.2 选取replicas列表的head作为leader, 将leader, isr信息写入zk的/brokers/topics/[topic]/partitions/[partition id]/state
    4.3 BrokerRequestBatch.addLeaderAndIsrRequestForBrokers: 构造 LeaderAndIsr Request,发送到各live broker, 这个request由broker内部的ReplicaManager组件处理,我们后面会有专门的章节来分析它;
    4.4 replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica): 将replica状态由NewReplica -> OnlineReplica;

第二个broker(我们叫它B2)启动

  1. sendUpdateMetadataRequest: 发送所有topic的partitionStateInfos到各broker;
  2. 针对新启动的broker, 调用replicaStateMachine.handleStateChanges更新replica状态到OnlineReplica:
val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
    replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
  1. partitionStateMachine.triggerOnlinePartitionStateChange():针对new and offline partitions进行选主;

针对已存在的topic, 在第二个broker B2上新增一个patition

          val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
          val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
            !controllerContext.partitionReplicaAssignment.contains(p._1))
          if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
            error("Skipping adding partitions %s for topic %s since it is currently being deleted"
                  .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
          else {
            if (partitionsToBeAdded.size > 0) {
              info("New partitions to be added %s".format(partitionsToBeAdded))
              controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
              controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
            }
          }

干三件事

  1. 获取topic新增的partition的replicas信息;
  2. 更新ControllerContext.partitionReplicaAssignment;
  3. 回调KafkaController.onNewPartitionCreation;
def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
    info("New partition creation callback for %s".format(newPartitions.mkString(",")))
    partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
    partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
  }

Kafka源码分析-汇总

上一篇 下一篇

猜你喜欢

热点阅读