玩转大数据收藏Java

Kafka broker 存储不均衡问题排查记录

2022-02-07  本文已影响0人  AlienPaul

背景

Kafka原本集群磁盘扩容后,新建topic的业务量较大,存在数据积压。一段时间后收到集群节点报警,部分磁盘使用率到达90%警戒值。经过检查发现问题节点各磁盘使用率不均衡。

问题排查

首先需要确认kafka的磁盘均衡策略。我们找到LogManagergetOrCreateLog方法。如果log不存在或者isNew为true,为特定的topic和partition创建一个新的log。代码如下所示:

def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, topicId: Option[Uuid]): UnifiedLog = {
    // 创建或者删除log的时候加锁
    logCreationOrDeletionLock synchronized {
        // 从futureLogs(isFuture为true)或者currentLogs中查找topicPartition对应的log
        val log = getLog(topicPartition, isFuture).getOrElse {
            // 如果没找到,执行这个block
            // 这段代码不可能被并发执行
            // create the log if it has not already been created in another thread
            if (!isNew && offlineLogDirs.nonEmpty)
            throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline")

            val logDirs: List[File] = {
                val preferredLogDir = preferredLogDirs.get(topicPartition)

                if (isFuture) {
                    if (preferredLogDir == null)
                    throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory")
                    else if (getLog(topicPartition).get.parentDir == preferredLogDir)
                    throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition")
                }

                if (preferredLogDir != null)
                List(new File(preferredLogDir))
                else
                // 进入这个方法,寻找存放该log的目录
                nextLogDirs()
            }

            val logDirName = {
                if (isFuture)
                UnifiedLog.logFutureDirName(topicPartition)
                else
                UnifiedLog.logDirName(topicPartition)
            }

            // 逐个创建log目录
            val logDir = logDirs
            .iterator // to prevent actually mapping the whole list, lazy map
            .map(createLogDirectory(_, logDirName))
            .find(_.isSuccess)
            .getOrElse(Failure(new KafkaStorageException("No log directories available. Tried " + logDirs.map(_.getAbsolutePath).mkString(", "))))
            .get // If Failure, will throw

            // 获取log配置
            val config = fetchLogConfig(topicPartition.topic)
            // 创建UnifiedLog对象
            val log = UnifiedLog(
                dir = logDir,
                config = config,
                logStartOffset = 0L,
                recoveryPoint = 0L,
                maxProducerIdExpirationMs = maxPidExpirationMs,
                producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,
                scheduler = scheduler,
                time = time,
                brokerTopicStats = brokerTopicStats,
                logDirFailureChannel = logDirFailureChannel,
                topicId = topicId,
                keepPartitionMetadataFile = keepPartitionMetadataFile)

            // 加入到log列表中
            if (isFuture)
            futureLogs.put(topicPartition, log)
            else
            currentLogs.put(topicPartition, log)

            info(s"Created log for partition $topicPartition in $logDir with properties ${config.overriddenConfigsAsLoggableString}")
            // Remove the preferred log dir since it has already been satisfied
            preferredLogDirs.remove(topicPartition)

            log
        }
        // When running a ZK controller, we may get a log that does not have a topic ID. Assign it here.
        if (log.topicId.isEmpty) {
            topicId.foreach(log.assignTopicId)
        }

        // Ensure topic IDs are consistent
        topicId.foreach { topicId =>
            log.topicId.foreach { logTopicId =>
                if (topicId != logTopicId)
                throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
                                                       s"but log already contained topic ID $logTopicId")
            }
        }
        log
    }
}

上面方法中,我们关心的地方是寻找存放log目录的重要逻辑。它位于nextLogDirs方法。该方法为接下来的partition提供建议的存储目录,按照建议顺序排序。目前的处理方式为计算各个目录中存放的分区数,从小到大排序。

private def nextLogDirs(): List[File] = {
    if(_liveLogDirs.size == 1) {
        // 如果可用log目只有1个,返回这个目录
        List(_liveLogDirs.peek())
    } else {
        // count the number of logs in each parent directory (including 0 for empty directories
        // 统计所有目录以及目录中的log数量
        val logCounts = allLogs.groupBy(_.parentDir).map { case (parent, logs) => parent -> logs.size }
        val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap
        val dirCounts = (zeros ++ logCounts).toBuffer

        // choose the directory with the least logs in it
        // 按照各个目录的log size从小到大排序,创建File后返回
        dirCounts.sortBy(_._2).map {
            case (path: String, _: Int) => new File(path)
        }.toList
    }
}

综上可知,Kafka在broker级别的存储策略为寻找log最少的目录优先存储。所以说Kafka是文件级别的均衡而不是存储容量的均衡。结合背景中的案例,新扩容的磁盘肯定是log数量最少的,因此新创建的topic一定会优先存储在这个磁盘上。在这些topic数据量较多的时候,扩容的磁盘使用率首先到达告警值。

解决方法

转移topic

对于已经位于问题broker上面的topic,建议转移partition。

首先查看topic分布在哪些broker上:

bin/kafka-topics.sh --zookeeper xxx.xxx.xxx.xxx:2181 --describe --topic topic_name

确定需要转移的topic。例如我们需要转移的topic为topicA和topicB,按照如下格式编写json文件:

{
    "topics": [
        {"topic": "topicA"},
        {"topic": "topicB"}
    ],
    "version":1
}

保存为topics-to-move.json文件。

然后使用如下命令,Kafka会自动生成各个partition需要转移到的目标broker,partition的均衡性自动保证。

bin/kafka-reassign-partitions.sh --zookeeper zk:2181 --topics-to-move-json-file topics-to-move.json --broker-list "3,4,5" --generate 

输出可能如下所示:

Current partition replica assignment
{"version":1,"partitions":[{"topic":"topicA","partition":2,"replicas":[0,2]},
{"topic":"topicA","partition":1,"replicas":[1,0]},
{"topic":"topicB","partition":1,"replicas":[0,1]},
{"topic":"topicA","partition":0,"replicas":[2,1]},
{"topic":"topicB","partition":0,"replicas":[2,0]},
{"topic":"topicB","partition":2,"replicas":[1,2]}]}

Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"topicA","partition":2,"replicas":[3,4]},
{"topic":"topicA","partition":1,"replicas":[5,3]},
{"topic":"topicB","partition":1,"replicas":[3,4]},
{"topic":"topicA","partition":0,"replicas":[4,5]},
{"topic":"topicB","partition":0,"replicas":[5,3]},
{"topic":"topicB","partition":2,"replicas":[4,5]}]}

我们再编写一个json文件,例如move-to-brokers.json,将Proposed partition reassignment configuration下面的内容填入该文件。最后执行:

bin/kafka-reassign-partitions.sh --zookeeper zk:2181 --reassignment-json-file move-to-brokers.json --execute

开始执行topic转移任务。可以使用:

bin/kafka-reassign-partitions.sh --zookeeper zk:2181 --reassignment-json-file move-to-brokers.json --verify

命令查看转移操作执行状态。

减少log保存时间

观察并调整如下参数:

注意:上面的限制条件如果同时配置均有效,无论哪个首先满足都会触发。

删除无用topic

查找集群中已存在的topic,删除掉不再使用的。此步骤相当于手动均衡各个磁盘下的partition分布。删除topic的命令如下:

./bin/kafka-topics --delete --zookeeper zk:2181 --topic topic_name
上一篇下一篇

猜你喜欢

热点阅读