Kafka源码分析(八)消息发送可靠性——flush.messa
2020-02-28 本文已影响0人
81e2cd2747f1
继续解答问题:
Kafka怎么样才能不丢消息?
考虑一种比较极端的情况,整个Kafka集群用的是同一路电源,在掉电的情况下,消息是有可能丢失的,即便消息已经被复制所有的ISR上。默认情况下,Kafka的刷盘是异步刷盘,也就是说,把消息写进OS的Page Cache后,已经别当成持久化成功了,但是此时的消息没有被sync到磁盘,如果所有ISR的消息都在Page Cache上而不在磁盘中,整体掉电重启后,消息就再也无法被消费者消费到,那么消息也就丢失。
private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
// code
segment.append(largestOffset = appendInfo.lastOffset,
largestTimestamp = appendInfo.maxTimestamp,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
records = validRecords)
// code
// unflushedMessages方法定义
// def unflushedMessages: Long = this.logEndOffset - this.recoveryPoint
// 当没有刷盘的消息累积到flushInterval时,做一次flush
if (unflushedMessages >= config.flushInterval)
flush()
// code
}
}
那万一最近没有新的消息,但是累积的消息的量又达不到,就需要依靠下面这个定时任务来做时间维度的定期flush
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs _,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
如果将flush.messages设置为1,那么每一条消息都会刷盘,配合前面整理的acks、min.insync.replicas,会使消息可靠性得到大幅度得提升,但是flush.messages=1会严重影响性能,可以在部分可靠性要求高的Topic级别进行配置。