KafkaProducer之写缓存
从 KafkaProducer 到 DefaultRecord ,经过了 7 个主要类 。流程如下:
Broker-写缓存.png
①:send() 方法,是封装的 doSend() 方法。在 doSend() 方法中执行了写缓存。
②:doSend() 里调用 RecordAccumulator.append() 方法。这是 RecordAccumulator的关键方法。
③:apend() 方法里会多次执行 tryAppend() 的逻辑。
④:调用 BufferPool.allocate(size) 从缓冲池里,分配一块 byteBuffer 缓存。
⑤:bufferPool 分配缓存,并生成 byteBuffer 对象,得到的 byteBuffer 对象可能是已有的,也可能是从内存中新分配出来的。
⑥:MemoryRecords 根据byteBuffer , 生成 MemoryRecordsBuilder 对象,MemoryRecordsBuilder 持有 byteBuffer 。
⑦:在RecordAccumulator中得到 MemoryRecordsBuilder 对象,并构建一个 ProducerBatch 批处理对象
⑧:MemoryRecordsBuilder 对象传递给 ProducerBatch 。
⑨:ProducerBatch 是相同TopicPartition一批数据的集合,一个 ProducerBatch 可以保存一批数据写到 MemoryRecordsBuilder 。
⑩:当一批中的数据量满了,MemoryRecordsBuilder 校验空间不足时,返回RecordAccumulator,再次重新分配新的缓存。
⑪:空间足够,append() 是不同参数的封装方法,可以进行下一步操作。
⑫:校验数据的合法性。
⑬:DefaultRecord 封装了一些静态方法,方便写数据到outStream中
⑭:调用ByteUtil 方法,执行真正的写数据到 outStream 中。
⑮:得到最终的,持有一批数据的 DataOutputStream 。
总结
目前看到的是 , BufferPool 分配了一个 和Key,Value一样大小的ByteBuffer 。 但是key和value没有写到 ByteBuffer 中。而是写到了 DataOutputStream 。
那么这个 BufferPool 到底是何用呢?
TODO:
- ByteBufferOutputStream 到底是何用?
- DataOutputStream 真是的最终持有数据了吗?
后记 2020.09.10
public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long baseOffset,
long logAppendTime,
long producerId,
short producerEpoch,
int baseSequence,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int writeLimit) {
if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("TimestampType must be set for magic >= 0");
if (magic < RecordBatch.MAGIC_VALUE_V2) {
if (isTransactional)
throw new IllegalArgumentException("Transactional records are not supported for magic " + magic);
if (isControlBatch)
throw new IllegalArgumentException("Control records are not supported for magic " + magic);
if (compressionType == CompressionType.ZSTD)
throw new IllegalArgumentException("ZStandard compression is not supported for magic " + magic);
}
this.magic = magic;
this.timestampType = timestampType;
this.compressionType = compressionType;
this.baseOffset = baseOffset;
this.logAppendTime = logAppendTime;
this.numRecords = 0;
this.uncompressedRecordsSizeInBytes = 0;
this.actualCompressionRatio = 1;
this.maxTimestamp = RecordBatch.NO_TIMESTAMP;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.baseSequence = baseSequence;
this.isTransactional = isTransactional;
this.isControlBatch = isControlBatch;
this.partitionLeaderEpoch = partitionLeaderEpoch;
this.writeLimit = writeLimit;
this.initialPosition = bufferStream.position();
this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType);
bufferStream.position(initialPosition + batchHeaderSizeInBytes);
//ByteBufferOutputStream , 里面持有了 byteBuffer
this.bufferStream = bufferStream;
// DataOutputStream 里持有的是 ByteBufferOutputStream
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}
之前看漏了 , DataOutputStream 里 持有的就是 ByteBufferOutputStream, 即 缓存的 byteBuffer 。 所以 往 DataOutputStream 里写数据, 就是往缓存写数据。
不能漏一丝细节!
如果文章有帮助到您,请点个赞,您的反馈会让我感到文章是有价值的