消息中间件玩转大数据大数据

ReplicaManager源码解析1-消息同步线程管理

2017-03-08  本文已影响296人  扫帚的影子

OffsetCheckPoint类
AbstractFetcherManager类
ReplicaFetcherManager类
AbstractFetcherThread类
override def doWork() {
     val fetchRequest = inLock(partitionMapLock) {
      val fetchRequest = buildFetchRequest(partitionMap)
      if (fetchRequest.isEmpty) {
        trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs))
        partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
      }
      fetchRequest
    }

    if (!fetchRequest.isEmpty)
      processFetchRequest(fetchRequest)
  }

基本上就是作三件事: 构造FetchRequest, 同步发送FetchRequest并接收FetchResponse, 处理FetchResponse, 这三件事的实现调用了下列方法:

def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PD)

  // handle a partition whose offset is out of range and return a new fetch offset
  def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long

  // deal with partitions with errors, potentially due to leadership changes
  def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])

  protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): REQ

  protected def fetch(fetchRequest: REQ): Map[TopicAndPartition, PD]

它们都是在具体的子类中实现, 我们在下面的 ReplicaFetcherThread类 中作说明.

ReplicaFetcherThread类
  1. earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.LATEST_TIMESTAMP, brokerConfig.brokerId) 先通过向Leader发送OffsetRequest来获取leader当前的LogEndOffset;
  2. 如果Leader的LogEndOffset小于当前replica的logEndOffset, 这原则上不可能啊,除非是出现了Unclean leader election:即ISR里的broker都挂了,然后ISR之外的一个replica作了主;
  3. 如果broker的配置不允许Unclean leader election, 则Runtime.getRuntime.halt(1);
  4. 如果broker的配置允许Unclean leader election, 则当前replica本地的log要作truncate, truncate到Leader的LogEndOffset;
  5. 如果Leader的LogEndOffset大于当前replica的logEndOffset, 说明Leader有有效的数据供当前的replica来同步,那么剩下的问题就是看从哪里开始同步了;
  6. earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, brokerConfig.brokerId) 通过向Leader发送OffsetRequest来获取leader当前有效的最旧Offset: StartOffset;
  7. 作一次truncate, 从startOffset开始追加:replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
  val requestMap = mutable.Map.empty[TopicPartition, JFetchRequest.PartitionData]

    partitionMap.foreach { case ((TopicAndPartition(topic, partition), partitionFetchState)) =>
      if (partitionFetchState.isActive)
        requestMap(new TopicPartition(topic, partition)) = new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize)
    }

    new FetchRequest(new JFetchRequest(replicaId, maxWait, minBytes, requestMap.asJava))

这个没什么好说的,就是按照FetchRequest的协议来;

    val clientResponse = sendRequest(ApiKeys.FETCH, Some(fetchRequestVersion), fetchRequest.underlying)
    new FetchResponse(clientResponse.responseBody).responseData.asScala.map { case (key, value) =>
      TopicAndPartition(key.topic, key.partition) -> new PartitionData(value)
    }

使用NetworkClient来实现到leader broker的连接,请求的发送和接收,
使用kafka.utils.NetworkClientBlockingOps._实现了这个网络操作的同步阻塞方式.
这个实现可参见KafkaController分析2-NetworkClient分析

  try {
      val TopicAndPartition(topic, partitionId) = topicAndPartition
      val replica = replicaMgr.getReplica(topic, partitionId).get
      val messageSet = partitionData.toByteBufferMessageSet
      warnIfMessageOversized(messageSet)

      if (fetchOffset != replica.logEndOffset.messageOffset)
        throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset))
      replica.log.get.append(messageSet, assignOffsets = false)
      val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
      replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
      trace("Follower %d set replica high watermark for partition [%s,%d] to %s"
            .format(replica.brokerId, topic, partitionId, followerHighWatermark))
    } catch {
      case e: KafkaStorageException =>
        fatal("Disk error while replicating data.", e)
        Runtime.getRuntime.halt(1)
    }

干三件事:

  1. 消息写入以相应的replica;
  2. 更新replica的highWatermark
  3. 如果有KafkaStorageException异常,就退出啦~~
delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)

目前的作法是将此partition的同步操作延迟一段时间.

Kafka源码分析-汇总

上一篇下一篇

猜你喜欢

热点阅读