kafkaKafka

Kafka源码分析(七)消息发送可靠性——min.insync.

2020-02-28  本文已影响0人  81e2cd2747f1

继续解答问题:
Kafka怎么样才能不丢消息?

再次说明下acks的语义
如果acks=all,代表消息需要在所有ISR都被持久化成功后,Broker才可以告诉生产者,消息已经发送成功了。

acks=all时,消息需要在所有ISR都被持久化成功后,这里为什么不设计成,消息需要在所有的Follower都被持久化成功后。这个当然是有原因的,之后的文章再详细讲解这个设计问题。

这里就涉及到一个关键的问题,3个ISR不代表3个Node(1 Leader + 2 Followers)。如果在一些特殊情况下,部分在ISR中的Follower被被剔除ISR,极端情况下,最后的ISR中只有Leader一个,消息被Leader持久化成功后,就给触发了DelayedProduce的tryComplete,因为完全符合上面表达的语义:如果acks=all,代表消息需要在所有ISR都被持久化成功后,Broker才可以告诉生产者,消息已经发送成功了,只是这次所有的ISR就Leader自己。

// ISR缩减任务
def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
    val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
    leaderReplicaIfLocal match {
    case Some(leaderReplica) =>
        // 获取那些没有跟上的Follower
        val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
        if(outOfSyncReplicas.nonEmpty) {
        val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
        assert(newInSyncReplicas.nonEmpty)
        info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","),
            newInSyncReplicas.map(_.brokerId).mkString(",")))
        // update ISR in zk and in cache
        // 把那些没有跟上的Follower从ISR中剔除
        updateIsr(newInSyncReplicas)
        // code
    }
}

// Follower是否跟上Leader的标准
def getOutOfSyncReplicas(maxLagMs: Long): Set[Int] = {
  // Leader统计每一个Follower的LastCaughtUpTime(上一次追赶上的时间),如果和当前时间比大于10s。
  val candidateReplicaIds = inSyncReplicaIds - localBrokerId
  val currentTimeMs = time.milliseconds()
  val leaderEndOffset = localLogOrException.logEndOffset
  candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, leaderEndOffset, currentTimeMs, maxLagMs))
}

这时如果Leader挂了,消息还是丢了。

问题就出在ISR的数量是可以忽多忽少的,ISR的数量变成1的时候,情况就完全和ISR=1是相同的。

Kafka当然是可以通过参数来限制ISR的数量的: min.insync.replicas = n,代表的语义是,如果生产者acks=all,而在发送消息时,Broker的ISR数量没有达到n,Broker不能处理这条消息,需要直接给生产者报错。

当然这个语义的解释已经足够清晰得表达了下面这段代码的意思

def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = {
    val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
        leaderReplicaIfLocal match {
        case Some(leaderReplica) =>
            val log = leaderReplica.log.get
            val minIsr = log.config.minInSyncReplicas
            val inSyncSize = inSyncReplicas.size

            // Avoid writing to leader if there are not enough insync replicas to make it safe
            if (inSyncSize < minIsr && requiredAcks == -1) {
            throw new NotEnoughReplicasException("Number of insync replicas for partition %s is [%d], below required minimum [%d]"
                .format(topicPartition, inSyncSize, minIsr))
            }

            // code
    }
    // code
}

这样的Fast-Fail处理,在当ISR不足时,也能够避由于Leader宕机引起的消息丢失。

问题解决了么?还没有,请看下篇分解。

上一篇下一篇

猜你喜欢

热点阅读