kafka客户端发送批量消息流程分析

2020-10-30  本文已影响0人  nanposan

1、根据分区申请内存块

buffer = free.allocate(size, maxTimeToBlock);

2、构造batch对象加入Dequeue

MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);

ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);

3、将key、value记录到buffer中

RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);

if (result.batchIsFull || result.newBatchCreated) {

log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);

  this.sender.wakeup();  //满足条件后唤醒线程发送batch

}

4、sender线程发送批次

sendProduceRequests(batches, now);

MemoryRecords records = batch.records();  //此处将buffer中的record读取出来

client.send(clientRequest, now);  

上一篇 下一篇

猜你喜欢

热点阅读