Kafka删除日志源码分析

2018-02-02  本文已影响0人  LancerLin_LX

问题描述

1.问题定位

最近需要利用flume采集神策的历史数据,数据量比较大,每天大概有2000万条数据,大概要采集一个月的。然后发现数据还没来得及消费就被删除了,

2.问题处理

查看kafka配置后,发现两个关于删除日志策略的参数都设置了。
log.retention.hourslog.retention.bytes
1. 查看kafka0.9.0.0源码,LogManager类

   def cleanupLogs() {
    debug("Beginning log cleanup...")
    var total = 0
    val startMs = time.milliseconds
    for(log <- allLogs; if !log.config.compact) {
      debug("Garbage collecting '" + log.name + "'")
      total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
    }
    debug("Log cleanup completed. " + total + " files deleted in " +
                  (time.milliseconds - startMs) / 1000 + " seconds")
  }

2. 查看total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)两个方法,删除逻辑是或
首先查看cleanupExpiredSegments(log) 方法,

 private def cleanupExpiredSegments(log: Log): Int = {
    if (log.config.retentionMs < 0)
      return 0
    val startMs = time.milliseconds
    log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
  }```
 startMs 这个参数的源码查看在这里
```scala
private def getLogRetentionTimeMillis: Long = {
    val millisInMinute = 60L * 1000L
    val millisInHour = 60L * millisInMinute

    val millis: java.lang.Long =
      Option(getLong(KafkaConfig.LogRetentionTimeMillisProp)).getOrElse(
        Option(getInt(KafkaConfig.LogRetentionTimeMinutesProp)) match {
          case Some(mins) => millisInMinute * mins
          case None => getInt(KafkaConfig.LogRetentionTimeHoursProp) * millisInHour
        })

    if (millis < 0) return -1
    millis   //总之就是 log.retention.hours log.retention.minutes log.retention.ms配置文件这三个来决定的,把他们转为毫秒
  }

所以cleanupExpiredSegments(log) 方法会根据日志超时时间来删除

查看cleanupSegmentsToMaintainSize(log)方法

private def cleanupSegmentsToMaintainSize(log: Log): Int = {
    if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
      return 0
    var diff = log.size - log.config.retentionSize
    def shouldDelete(segment: LogSegment) = {
      if(diff - segment.size >= 0) {
        diff -= segment.size
        true
      } else {
        false
      }
    }
    log.deleteOldSegments(shouldDelete)
  }

根据日志大小来决定,这个参数是log.retention.bytes
总结:kafka删除日志的逻辑是通过时间和日志大小来删除的,配置参数log.retention.hourslog.retention.bytes,如果只要按时间删除,则不设置log.retention.bytes,默认是-1,今天的问题就在于两个参数都设置了,所以出现日志过大就被删除了。

3.最后

Kafka日志是多久删除一次的呢?
LogManager类中的startup

def startup() {
    /* Schedule the cleanup task to delete old logs */
    if(scheduler != null) {
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
      scheduler.schedule("kafka-log-retention", 
                         cleanupLogs, 
                         delay = InitialTaskDelayMs, 
                         period = retentionCheckMs, 
                         TimeUnit.MILLISECONDS)
      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
      scheduler.schedule("kafka-log-flusher", 
                         flushDirtyLogs, 
                         delay = InitialTaskDelayMs, 
                         period = flushCheckMs, 
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-recovery-point-checkpoint",
                         checkpointRecoveryPointOffsets,
                         delay = InitialTaskDelayMs,
                         period = flushCheckpointMs,
                         TimeUnit.MILLISECONDS)
    }
    if(cleanerConfig.enableCleaner)
      cleaner.startup()
  }```
寻找` retentionCheckMs,`是由这个参数决定的,具体过程不详细秒速,`log.retention.check.interval.ms`,默认是300秒,就是5分钟执行清理逻辑一次。



上一篇 下一篇

猜你喜欢

热点阅读