Kafka程序员文字欲

无镜--kafka之生产者(二)

2018-07-30  本文已影响10人  绍圣

书接上回。在上回中,我们了解了记录收集器周围的生态类,接下来看看生产的消息是如何追加到记录收集器(RecordAccumulator)中。

RecordAccumulator的append方法

public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException {

// 条数+1,向收集器添加消息的条数

appendsInProgress.incrementAndGet();

try {

// 每个topicPartition对应一个Deque

// 从batches:ConcurrentMap<TopicPartition,Deque<RecordBatch>>中获取tp的双向队列,不存在的话,就新建一个

Deque dq = getOrCreateDeque(tp);

synchronized (dq) { // 阻塞这个tp的双向队列,确保进行操作时线程安全

if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);

if (appendResult != null) // 添加成功,直接返回

return appendResult;

}

// 为 topic-partition 创建一个新的 RecordBatch, 需要初始化相应的 RecordBatch

int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));

log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());

ByteBuffer buffer = free.allocate(size, maxTimeToBlock); // 给这个 RecordBatch 初始化一个 buffer

synchronized (dq) { // 阻塞这个tp的双向队列,确保进行操作时线程安全

if (closed) // 验证生产者是否已关闭

throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);

if (appendResult != null) { // 添加成功,释放已经分配的ByteBuffer空间 free.deallocate(buffer);

return appendResult;

}

// 添加失败:因为此tp的双向队列最新的RecordBatch已经没有足够的容量来存储这条记录了。所以需要重新建立一个RecordBatch

// 构建一个内存的MemoryRecords(内存记录集)

MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);

// 为添加失败的消息,构建一个RecordBatch

RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());

// 向RecordBatch中追加数据

FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

dq.addLast(batch); // 将新RecordBatch添加到对应的tp的双向队列中

// 每个RecordBatch都会被发送到服务端,服务端会响应给客户端

//在Sender线程把生产请求提交到服务端后,服务端正常响应给客户端,客户端在处理响应的回调函数中,如果是成功提交到服务端,那么就从incomplete集合中删除

// IncompleteRecordBatches启到监控RecordBatch的作用:正常提交,超时没有提交到,发现leader不能使用,Sender强制退出会对应做add和remove

incomplete.add(batch);

// 如果 dp.size()>1 就证明这个 Dueue 有一个 batch 是可以发送了

return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);

}

} finally {

appendsInProgress.decrementAndGet();

}

}

1,获取tp对应的双向队列:Deque<RecordBatch>(没有的话,会新建一个)。

2,获取tp对应的双向队列对象的锁,试着追加记录到队列中

3,如果2不成功,使用RecordAccumulator中的BufferPool实例给此条消息申请足够的空间。

4,获取tp对应的双向队列对象的锁,再次试着追加记录到队列中。

5,如果4还是不成功,说明tp的双向队列中最新的RecordBatch已经没有足够的容量来存储这条记录了,所以必须新创建一个RecordBatch,

6,向第5步新建的RecordBatch追加记录。

7,将新创建的RecordBatch追加到p对应的双向队列中。

8,将新创建的RecordBatch保存到存储发送尚未完成的RecordBatch的类中:IncompleteRecordBatches。

9,返回RecordAppendResult实例

IncompleteRecordBatches

incomplete:Set<RecordBatch>:保存发送尚未完成RecordBatch。

这里不会分tp,所以由KafkaProducer产生的记录,最后汇总生成的RecordBatch,在没有发送到服务端,收到服务端的响应之前,都会保存在此集合中(在Sender线程把生产请求提交到服务端后,服务端正常响应给客户端,客户端在处理响应的回调函数中,如果是成功提交到服务端,那么就从incomplete集合中删除)。

试着追加记录的方法:tryAppend

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque deque) {

RecordBatch last = deque.peekLast(); //获取队列尾部元素,队列为空是返回Null

if (last != null) {

FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());

if (future == null) // 没有空间来存储键值对了,关闭MemoryRecords对象,表明已经满了

last.records.close();

else

return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);

}

return null;

}


RecordBatch.tryAppend

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {

if (!this.records.hasRoomFor(key, value)) {

// 如果内存记录MemoryRecords实例records中没有多余的空间直接返回NULL

return null;

} else {

long checksum = this.records.append(offsetCounter++, timestamp, key, value); // 将key、value添加进内存记录MemoryRecords中

this.maxRecordSize = Math.max(this.maxRecordSize,Record.recordSize(key, value));

// 如果需要就更新最大记录大小maxRecordSize this.lastAppendTime = now;

FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length);

if (callback != null)

thunks.add(new Thunk(callback, future));

this.recordCount++;

return future;

}

}

最终追加的消息是存储在MemoryRecords中。前面判断的有无空间来存放追加的消息,也是判断的MemoryRecords中是否有空间。MemoryRecords用来存储消息的是ByteBuffer实例(在BufferPool对象中分配一定空间的ByteBuffer对象)。

RecordAccumulator,topic-partition,RecordBatch,记录(R)的关系


有两个疑问:

第一:在RecordAccumulator的append方法,为什么要试着追加两次记录到RecordBatch中?

思考:在一个KafkaProducer实例中会持有一个RecordAccumulator实例。在并发环境下一个KafkaProducer实例可能会被多个线程所持有。在append中会持有dq的锁,那么在两次append之间可能某一个线程已经释放了持有dq的锁,再者已经走完了第二个锁的逻辑(创建新的RecordBatch空间)。那么线程在获取dq的锁后,如果在试着追加一次记录到RecordBatch中,是有可能会成功的。如果还是不行,再去为此dq创建新的RecordBatch空间。

第二:在第二次试着追加记录到RecordBatch中的时候,为什么要预先分配空间,在加入不成功后又释放空间,而不是确认了第二次也加不成功后,再分配空间。

思考:通过和柳年思水的讨论,觉得以下解释似乎合理:放在锁的外面,是让和发送本身不相关的操作尽量的放在锁外,这样在发送的时候就不会因为获取不到锁而阻塞。

上一篇下一篇

猜你喜欢

热点阅读