详解num.recovery.threads.per.data.
2018-12-01 本文已影响23人
Woople
kafka broker启动的过程中会加载此节点上所有topic的log文件,如果数据量非常大会导致加载时间过长,通过修改num.recovery.threads.per.data.dir
可以加快log的恢复速度。默认num.recovery.threads.per.data.dir
是1。
源码分析
注:本文源码基于kafka-0.10.1.1
加载log调用的是LogManager.loadLogs()
方法,部分代码如下
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
for (dir <- this.logDirs) {
val pool = Executors.newFixedThreadPool(ioThreads)
可以看到会为每个log文件夹创建一个线程池,线程数由变量ioThreads
指定,而这个值就是num.recovery.threads.per.data.dir
。参考下面KafkaServer.createLogManager
方法片段。
new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
topicConfigs = configs,
defaultConfig = defaultLogConfig,
cleanerConfig = cleanerConfig,
ioThreads = config.numRecoveryThreadsPerDataDir,
flushCheckMs = config.logFlushSchedulerIntervalMs,
flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
scheduler = kafkaScheduler,
brokerState = brokerState,
time = time)