Apache Kafka@IT·大数据kafka

Kafka集群Metadata管理

2017-02-04  本文已影响1445人  扫帚的影子

Metadata的存储在哪里 --- MetadataCache组件
val metadataCache: MetadataCache = new MetadataCache(config.brokerId)
private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
    new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
def readFrom(buffer: ByteBuffer): PartitionStateInfo = {
    val controllerEpoch = buffer.getInt
    val leader = buffer.getInt
    val leaderEpoch = buffer.getInt
    val isrSize = buffer.getInt
    val isr = for(i <- 0 until isrSize) yield buffer.getInt
    val zkVersion = buffer.getInt
    val replicationFactor = buffer.getInt
    val replicas = for(i <- 0 until replicationFactor) yield buffer.getInt
    PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr.toList, zkVersion), controllerEpoch),
                       replicas.toSet)
  }
private var aliveBrokers: Map[Int, Broker] = Map()
MetadataCache如何获取和更新metadata信息
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
    val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
    
    authorizeClusterAction(request)

    replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache)

    val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)
    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, updateMetadataResponse)))

可以看到是调用了ReplicaManager.maybeUpdateMetadataCache方法, 里面又会调用到MetadataCache.updateCache方法

      aliveBrokers = updateMetadataRequest.aliveBrokers.map(b => (b.id, b)).toMap
      updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) =>
        if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) {
          removePartitionInfo(tp.topic, tp.partition)
        } else {
          addOrUpdatePartitionInfo(tp.topic, tp.partition, info)
        }
      }

干三件事

  1. 更新aliveBrokers;
  2. 如果某个topic的的parition的leader是无效的, 则removePartitionInfo(tp.topic, tp.partition);
  3. 新增或更新某个topic的某个parition的信息, addOrUpdatePartitionInfo(tp.topic, tp.partition, info): 将信息meta信息保存到MetadataCachecache对象中;
Metadata信息从哪里来
谁使用metadata信息
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
    val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]

    //if topics is empty -> fetch all topics metadata but filter out the topic response that are not authorized
    val topics = if (metadataRequest.topics.isEmpty) {
      val topicResponses = metadataCache.getTopicMetadata(metadataRequest.topics.toSet, request.securityProtocol)
      topicResponses.map(_.topic).filter(topic => authorize(request.session, Describe, new Resource(Topic, topic))).toSet
    } else {
      metadataRequest.topics.toSet
    }

    //when topics is empty this will be a duplicate authorization check but given this should just be a cache lookup, it should not matter.
    var (authorizedTopics, unauthorizedTopics) = topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic)))

    if (!authorizedTopics.isEmpty) {
      val topicResponses = metadataCache.getTopicMetadata(authorizedTopics, request.securityProtocol)
      if (config.autoCreateTopicsEnable && topicResponses.size != authorizedTopics.size) {
        val nonExistentTopics: Set[String] = topics -- topicResponses.map(_.topic).toSet
        authorizer.foreach {
          az => if (!az.authorize(request.session, Create, Resource.ClusterResource)) {
            authorizedTopics --= nonExistentTopics
            unauthorizedTopics ++= nonExistentTopics
          }
        }
      }
    }

    val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.TopicAuthorizationCode))

    val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[TopicMetadata] else getTopicMetadata(authorizedTopics, request.securityProtocol)
    val brokers = metadataCache.getAliveBrokers
    val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata  ++ unauthorizedTopicMetaData, metadataRequest.correlationId)
    requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))

看着代码不少, 实际上比较简单:

  1. 先确定需要获取哪些topic的metadata信息, 如果request里未指定topic, 则获取当前所有的topic的metadata信息;
  2. 有效性验证,将topic分为authorizedTopicsunauthorizedTopics;
  3. 获取authorizedTopics的metadata, 注意getTopicMetadata方法是关键所在, 它会先筛选出当前不存在的topic, 如果auto.create.topics.enable=true, 则调用AdminUtils.createTopic先创建此topic, 但此时其PartitionStateInfo为空, 不过也会作为Metadata Response的一部分返回给客户端.

Kafka源码分析-汇总

上一篇下一篇

猜你喜欢

热点阅读