Apache Kafka@IT·大数据玩转大数据

KafkaController分析4-Partition选主

2017-01-17  本文已影响513人  扫帚的影子

PartitionLeaderSelector

/**
   * @param topicAndPartition          The topic and partition whose leader needs to be elected
   * @param currentLeaderAndIsr        The current leader and isr of input partition read from zookeeper
   * @throws NoReplicaOnlineException If no replica in the assigned replicas list is alive
   * @return The leader and isr request, with the newly selected leader and isr, and the set of replicas to receive
   * the LeaderAndIsrRequest.
   */
  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])

OfflinePartitionLeaderSelector

Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest):

  1. If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live isr as the new isr.
  2. Else, if unclean leader election for the topic is disabled, it throws a NoReplicaOnlineException.
  3. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr.
  4. If no broker in the assigned replica list is alive, it throws a NoReplicaOnlineException
    Replicas to receive LeaderAndIsr request = live assigned replicas
  5. Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
PartitionLeaderSelector.png

ReassignedPartitionLeaderSelector

  • New leader = a live in-sync reassigned replica

ControlledShutdownLeaderSelector

  • New leader = replica in isr that's not being shutdown;

NoOpLeaderSelector

def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
    warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.")
    (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
  }
Kafka源码分析-汇总
上一篇 下一篇

猜你喜欢

热点阅读