KafkaProducer
消息的流动
一个请求的发送分为下面几步:
其中,Sender把准备好的Batch取出,把要发往同一Node的Batch放在一起,发给NetworkClient
内存管理的实现
管理一组poolableSize的内存,以便减少gc,增快效率。
消息是如何累加到Batch的
在写模式的ByteBuffer上叠加输出流,输出完成后转为读模式。
ProducerBatch维护一个MemoryRecordsBuilder,向其中写入记录。MemoryRecordsBuilder对ByteBuffer写入多条记录,再赋给MemoryRecords供读取。
MemoryRecordsBuilder显然是建造者模式。接受一个ByteBuffer,在其上开启ByteBufferOutputStream,并叠加压缩流。用法: 构造函数->closeForRecordAppends->build, 先用hasRoomFor/isFull判断是否可写入
- 构造函数中会开启流
- build会调用close后,返回
builtRecords
- close将ByteBuffer转为读模式,赋给MemoryRecords,并赋给
builtRecords
- close将ByteBuffer转为读模式,赋给MemoryRecords,并赋给
请求的发送和响应是如何实现的
请求在发送时,在组件链中一路向前传递,而调用方线程(如果是get调用)会阻塞等待调用完成。那么当NetworkClient收到响应后,需要释放Batch的内存、控制对应请求的调用方线程继续运行、调用拦截器的回调,如何做到呢?
回调与InFligh机制[1] 。
回调
RequestCompletionHandler
NetworkClient收到响应后会回调Response,处理后续工作。
Sender创建ClientRequest时,会传入回调函数RequestCompletionHandler,在其中定义了释放ByteBuffer的逻辑。
在Sender::sendProduceRequest中定义了回调函数,该回调再NetworkClient收到请求时会调用:
一路调用至此,调用batch.done让调用方线程继续、调用回调。
另外移除inFlightBatches,移除incomplete(与Inflight机制有关,见下文)、释放内存
InterceptorCallback
ProducerBatch创建时,会创建ProduceRequestResult,后者维护了一个CountDownLatch,在ProduceRequestResult上调用的await和done都会转发到其上。用于控制客户端的流程。
ProducerBatch添加记录时,会把InterceptorCallback,拦截器的回调也加入
在ProducerBatch完成时,Sender::completeBatch->ProducerBatch::done调用该方法,拦截器回调和ProduceRequestResult都会被调用
调用方流程控制
利用ProducerBatch的ProduceRequestResult变量实现。ProduceRequestResult维护了一个CountDownLatch,外接调用get时阻塞于此。
当响应收到后,Sender调用ProducerBatch::done
InFlight机制
InFlight机制是我临时发明的概念,代表发送后等待处理的请求/Batch,实现这样机制的可以叫做InFlight结构体。
NetworkClient中的InFlightRequests、Sender中的
inFlightBatches
变量和RecordAccumulator中的IncompleteBatches,都用到了这个机制。
请求在被传往下一个组件前,会先以某种标识存于这样的结构体,当响应收到时,又按标识把对应的请求取出,处理对应的逻辑。
InFlightRequests
NetworkClient用到的InFlightRequests中维护了一个Map,代表等待处理的请求。
消息发送时,NetworkClient::doSend会调用该方法,把请求按目标节点存于队列
当收到响应后,NetworkClient::handleCompletedReceives会调用InflightRequests::completeNext,按发送节点将队首请求取出,生成ClientResponse,完成之后的逻辑。
注意到请求发送后会按节点存在队列,收到响应后直接取出对应节点的队首,这是因为服务端保障了一个机制: "请求一定按顺序被响应,先发送的请求一定先响应"。所以尽管发往同一个节点的操作可能应用于不同partition(多个partition的leader都在一个节点上是可能的),它们的响应一定是按顺序返回的。
inFlightBatches
Sender维护了一个inFlightBatches
,代表"等待完成的Batch"。所有发送出去的请求,在还没收到响应前都存于此。
// Sender.java
// A per-partition queue of batches ordered by creation time for tracking the in-flight batches
private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;
当NetworkClient收到响应后,对应的Batch就完成处理了,Sender的该方法会被调用,将该Batch移除。
incomplete
在RecordAccumulator::append中,内存被申请,该Batch被添加到incomplete。
在响应收到后,RecordAccumulator::deallocate被调用,移除对应的Batch,释放内存
-
Sender::sendProduceRequest中定义的RequestCompletionHandler ↩