KafkaProducer

2020-06-12  本文已影响0人  不存在的里皮

消息的流动

一个请求的发送分为下面几步:


其中,Sender把准备好的Batch取出,把要发往同一Node的Batch放在一起,发给NetworkClient

内存管理的实现

管理一组poolableSize的内存,以便减少gc,增快效率。

消息是如何累加到Batch的

在写模式的ByteBuffer上叠加输出流,输出完成后转为读模式。

ProducerBatch维护一个MemoryRecordsBuilder,向其中写入记录。MemoryRecordsBuilder对ByteBuffer写入多条记录,再赋给MemoryRecords供读取。
MemoryRecordsBuilder显然是建造者模式。接受一个ByteBuffer,在其上开启ByteBufferOutputStream,并叠加压缩流。用法: 构造函数->closeForRecordAppends->build, 先用hasRoomFor/isFull判断是否可写入

请求的发送和响应是如何实现的

请求在发送时,在组件链中一路向前传递,而调用方线程(如果是get调用)会阻塞等待调用完成。那么当NetworkClient收到响应后,需要释放Batch的内存、控制对应请求的调用方线程继续运行、调用拦截器的回调,如何做到呢?

回调InFligh机制[1]

回调

RequestCompletionHandler
NetworkClient收到响应后会回调Response,处理后续工作。
Sender创建ClientRequest时,会传入回调函数RequestCompletionHandler,在其中定义了释放ByteBuffer的逻辑。

在Sender::sendProduceRequest中定义了回调函数,该回调再NetworkClient收到请求时会调用:



一路调用至此,调用batch.done让调用方线程继续、调用回调。
另外移除inFlightBatches,移除incomplete(与Inflight机制有关,见下文)、释放内存


InterceptorCallback
ProducerBatch创建时,会创建ProduceRequestResult,后者维护了一个CountDownLatch,在ProduceRequestResult上调用的await和done都会转发到其上。用于控制客户端的流程

ProducerBatch添加记录时,会把InterceptorCallback,拦截器的回调也加入


在ProducerBatch完成时,Sender::completeBatch->ProducerBatch::done调用该方法,拦截器回调和ProduceRequestResult都会被调用

调用方流程控制

利用ProducerBatch的ProduceRequestResult变量实现。ProduceRequestResult维护了一个CountDownLatch,外接调用get时阻塞于此。


当响应收到后,Sender调用ProducerBatch::done

InFlight机制

InFlight机制是我临时发明的概念,代表发送后等待处理的请求/Batch,实现这样机制的可以叫做InFlight结构体。

NetworkClient中的InFlightRequests、Sender中的inFlightBatches变量和RecordAccumulator中的IncompleteBatches,都用到了这个机制。

请求在被传往下一个组件前,会先以某种标识存于这样的结构体,当响应收到时,又按标识把对应的请求取出,处理对应的逻辑。

InFlightRequests
NetworkClient用到的InFlightRequests中维护了一个Map,代表等待处理的请求。


消息发送时,NetworkClient::doSend会调用该方法,把请求按目标节点存于队列

当收到响应后,NetworkClient::handleCompletedReceives会调用InflightRequests::completeNext,按发送节点将队首请求取出,生成ClientResponse,完成之后的逻辑。

注意到请求发送后会按节点存在队列,收到响应后直接取出对应节点的队首,这是因为服务端保障了一个机制: "请求一定按顺序被响应,先发送的请求一定先响应"。所以尽管发往同一个节点的操作可能应用于不同partition(多个partition的leader都在一个节点上是可能的),它们的响应一定是按顺序返回的。

inFlightBatches
Sender维护了一个inFlightBatches,代表"等待完成的Batch"。所有发送出去的请求,在还没收到响应前都存于此。

// Sender.java
// A per-partition queue of batches ordered by creation time for tracking the in-flight batches
private final Map<TopicPartition, List<ProducerBatch>> inFlightBatches;

当NetworkClient收到响应后,对应的Batch就完成处理了,Sender的该方法会被调用,将该Batch移除。


incomplete
在RecordAccumulator::append中,内存被申请,该Batch被添加到incomplete。
在响应收到后,RecordAccumulator::deallocate被调用,移除对应的Batch,释放内存


  1. Sender::sendProduceRequest中定义的RequestCompletionHandler

上一篇下一篇

猜你喜欢

热点阅读