Kafka

Kafka源码分析(八)消息发送可靠性——flush.messa

2020-02-28  本文已影响0人  81e2cd2747f1

继续解答问题:
Kafka怎么样才能不丢消息?

考虑一种比较极端的情况,整个Kafka集群用的是同一路电源,在掉电的情况下,消息是有可能丢失的,即便消息已经被复制所有的ISR上。默认情况下,Kafka的刷盘是异步刷盘,也就是说,把消息写进OS的Page Cache后,已经别当成持久化成功了,但是此时的消息没有被sync到磁盘,如果所有ISR的消息都在Page Cache上而不在磁盘中,整体掉电重启后,消息就再也无法被消费者消费到,那么消息也就丢失。

private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
    maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
        // code

        segment.append(largestOffset = appendInfo.lastOffset,
            largestTimestamp = appendInfo.maxTimestamp,
            shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
            records = validRecords)

        // code

        // unflushedMessages方法定义
        // def unflushedMessages: Long = this.logEndOffset - this.recoveryPoint
        // 当没有刷盘的消息累积到flushInterval时,做一次flush
        if (unflushedMessages >= config.flushInterval)
            flush()

        // code
    }
}

那万一最近没有新的消息,但是累积的消息的量又达不到,就需要依靠下面这个定时任务来做时间维度的定期flush

scheduler.schedule("kafka-log-flusher",
                flushDirtyLogs _,
                delay = InitialTaskDelayMs,
                period = flushCheckMs,
                TimeUnit.MILLISECONDS)

如果将flush.messages设置为1,那么每一条消息都会刷盘,配合前面整理的acks、min.insync.replicas,会使消息可靠性得到大幅度得提升,但是flush.messages=1会严重影响性能,可以在部分可靠性要求高的Topic级别进行配置。

上一篇下一篇

猜你喜欢

热点阅读