Kafka相关大数据开发

大数据开发:Kafka日志删除与日志压缩

2021-06-01  本文已影响0人  成都加米谷大数据

在实时流数据处理当中,随着运行时间不断加长,日志所占据的空间会不断变大,沉冗的数据会占据过多的空间,也拉低运行的效率,因此也需要适时地对日志进行清理。今天的大数据开发学习分享,我们就主要来讲讲,Kafka日志删除与日志压缩。

Kafka提供了日志删除(delete)和日志压缩(compact)两种清理日志的策略,可以通过参数cleanup.policy为每个主题指定不同的清理策略,也可以在代理启动时通过配置项log.cleanup.policy指定日志清理策略,这样该代理上的所有分区日志清理默认使用该配置设置的策略,主题级别的策略设置会覆盖代理级别的配置。

Kafka日志删除

在日志管理器启动时会启动一个后台定时任务线程用于定时删除日志段文件,每隔${log.retention.check.interval.ms}毫秒检查一次是否进行日志删除,默认是每5分钟执行一次。Kafka提供了基于日志保留时长和日志段大小两种日志删除配置方式,默认是以日志保留时长来进行日志删除操作。

1.基于日志保留时长

基于日志保留时长的配置有log.retention.hours、log.retention.minutes和log.retention.ms,这3种时间配置项的单位依次为时、分、毫秒,表示日志段保留时长,默认是设置log.retention.hours=168,即168小时,日志段文件被保留7天之后会被清理。

2.基于日志段大小

通过配置项log.retention.bytes设置日志段大小,删除逻辑是:首先计算日志段总大小与retention.bytes之间的差值(diff),得到需要删除的日志总大小,然后从第一个日志段开始查找,若diff与该日志段字节之差不小于0,则将该日志段加入到待删除集合中,以此类推,直到diff与查找的日志段大小之差小于0,查找结束,迭代待删除的日志段文件,进行物理删除。

Kafka日志压缩

日志压缩是一种更细粒度的清理策略,基于消息的Key,通过压缩每个Key对应的消息只保留最后一个版本的数据,该Key对应的其他版本在压缩时会被清除,同时压缩策略将Key对应的值为空的消息,认为是直接删除该条消息。为了不影响日志追加操作,日志压缩并不会对活跃段进行操作。

日志压缩的配置项是log.cleaner.enable=true,默认是开启,然后设置主题级别的日志清理策略配置项为compact。

Kafka的日志压缩状态有3种,当Log开始进行压缩时压缩状态为LogCleaningInProgress,当压缩任务为暂停时就会进入LogCleaningPaused,当压缩被终止时就会转换为LogCleaningAborted状态。暂停和终止状态时不会进行日志压缩,需要等其他线程将其恢复为压缩状态。

日志压缩会生成清理检查点文件,记录每个主题的每个分区TopicAndPartition清理的偏移量。通过该文件,可以将数据文件分成两部分,一部分是已经过压缩操作的clean段,另一部分是未经过压缩操作的dirty段。其中经过压缩的clean段中的偏移量不是连续递增的,而dirty段的偏移量则是连续递增的。如下图所示:

压缩逻辑是从本次需要清理的日志起始位置与最大结束位置开始遍历,因为活跃段不参与日志压缩,所以最大结束位置显然是活跃段的起始位置,将每个消息的Key及该Key对应消息的偏移量保存到一个固定容量的SkimpyOffsetMap中,由于是Map,因此当Key相同时后面的值会覆盖先前的值,这样就保证了相同Key的消息只保留最晚的值。

最后,对两个索引文件进行处理,去掉多余的索引项,同时将压缩后的日志段数据刷到磁盘。

需要注意将日志压缩与日志删除区分开,日志删除是删除整个日志段,而日志压缩是将相同key的日志进行合并,只保留该key最后一个值,将合并后的数据构成新的日志段,同时删除原来的日志段。日志压缩过程如下图所示:

关于大数据开发学习,Kafka日志删除与日志压缩,以上就为大家做了大致的讲解了。Kafka的日志管理,我们从日志结构到加载恢复到删除压缩,都为大家讲解完了,大家可以联系起来一起理解。

上一篇 下一篇

猜你喜欢

热点阅读