图解Kafka

KafkaProducer之写缓存

2020-09-10  本文已影响0人  _孙行者_

从 KafkaProducer 到 DefaultRecord ,经过了 7 个主要类 。流程如下:

Broker-写缓存.png

①:send() 方法,是封装的 doSend() 方法。在 doSend() 方法中执行了写缓存。

②:doSend() 里调用 RecordAccumulator.append() 方法。这是 RecordAccumulator的关键方法。

③:apend() 方法里会多次执行 tryAppend() 的逻辑。
④:调用 BufferPool.allocate(size) 从缓冲池里,分配一块 byteBuffer 缓存。

⑤:bufferPool 分配缓存,并生成 byteBuffer 对象,得到的 byteBuffer 对象可能是已有的,也可能是从内存中新分配出来的。

⑥:MemoryRecords 根据byteBuffer , 生成 MemoryRecordsBuilder 对象,MemoryRecordsBuilder 持有 byteBuffer 。

⑦:在RecordAccumulator中得到 MemoryRecordsBuilder 对象,并构建一个 ProducerBatch 批处理对象

⑧:MemoryRecordsBuilder 对象传递给 ProducerBatch 。

⑨:ProducerBatch 是相同TopicPartition一批数据的集合,一个 ProducerBatch 可以保存一批数据写到 MemoryRecordsBuilder 。

⑩:当一批中的数据量满了,MemoryRecordsBuilder 校验空间不足时,返回RecordAccumulator,再次重新分配新的缓存。

⑪:空间足够,append() 是不同参数的封装方法,可以进行下一步操作。

⑫:校验数据的合法性。
⑬:DefaultRecord 封装了一些静态方法,方便写数据到outStream中
⑭:调用ByteUtil 方法,执行真正的写数据到 outStream 中。
⑮:得到最终的,持有一批数据的 DataOutputStream 。

总结

目前看到的是 , BufferPool 分配了一个 和Key,Value一样大小的ByteBuffer 。 但是key和value没有写到 ByteBuffer 中。而是写到了 DataOutputStream 。

那么这个 BufferPool 到底是何用呢?

TODO:

后记 2020.09.10

public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
                                byte magic,
                                CompressionType compressionType,
                                TimestampType timestampType,
                                long baseOffset,
                                long logAppendTime,
                                long producerId,
                                short producerEpoch,
                                int baseSequence,
                                boolean isTransactional,
                                boolean isControlBatch,
                                int partitionLeaderEpoch,
                                int writeLimit) {
        if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
            throw new IllegalArgumentException("TimestampType must be set for magic >= 0");
        if (magic < RecordBatch.MAGIC_VALUE_V2) {
            if (isTransactional)
                throw new IllegalArgumentException("Transactional records are not supported for magic " + magic);
            if (isControlBatch)
                throw new IllegalArgumentException("Control records are not supported for magic " + magic);
            if (compressionType == CompressionType.ZSTD)
                throw new IllegalArgumentException("ZStandard compression is not supported for magic " + magic);
        }

        this.magic = magic;
        this.timestampType = timestampType;
        this.compressionType = compressionType;
        this.baseOffset = baseOffset;
        this.logAppendTime = logAppendTime;
        this.numRecords = 0;
        this.uncompressedRecordsSizeInBytes = 0;
        this.actualCompressionRatio = 1;
        this.maxTimestamp = RecordBatch.NO_TIMESTAMP;
        this.producerId = producerId;
        this.producerEpoch = producerEpoch;
        this.baseSequence = baseSequence;
        this.isTransactional = isTransactional;
        this.isControlBatch = isControlBatch;
        this.partitionLeaderEpoch = partitionLeaderEpoch;
        this.writeLimit = writeLimit;
        this.initialPosition = bufferStream.position();
        this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType);

        bufferStream.position(initialPosition + batchHeaderSizeInBytes);
        //ByteBufferOutputStream , 里面持有了 byteBuffer 
        this.bufferStream = bufferStream;
        // DataOutputStream 里持有的是 ByteBufferOutputStream 
        this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
    }

之前看漏了 , DataOutputStream 里 持有的就是 ByteBufferOutputStream, 即 缓存的 byteBuffer 。 所以 往 DataOutputStream 里写数据, 就是往缓存写数据。

不能漏一丝细节!


如果文章有帮助到您,请点个赞,您的反馈会让我感到文章是有价值的

上一篇 下一篇

猜你喜欢

热点阅读