无镜--kafka之生产者(二)
书接上回。在上回中,我们了解了记录收集器周围的生态类,接下来看看生产的消息是如何追加到记录收集器(RecordAccumulator)中。
RecordAccumulator的append方法
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException {
// 条数+1,向收集器添加消息的条数
appendsInProgress.incrementAndGet();
try {
// 每个topicPartition对应一个Deque
// 从batches:ConcurrentMap<TopicPartition,Deque<RecordBatch>>中获取tp的双向队列,不存在的话,就新建一个
Deque dq = getOrCreateDeque(tp);
synchronized (dq) { // 阻塞这个tp的双向队列,确保进行操作时线程安全
if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null) // 添加成功,直接返回
return appendResult;
}
// 为 topic-partition 创建一个新的 RecordBatch, 需要初始化相应的 RecordBatch
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
ByteBuffer buffer = free.allocate(size, maxTimeToBlock); // 给这个 RecordBatch 初始化一个 buffer
synchronized (dq) { // 阻塞这个tp的双向队列,确保进行操作时线程安全
if (closed) // 验证生产者是否已关闭
throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null) { // 添加成功,释放已经分配的ByteBuffer空间 free.deallocate(buffer);
return appendResult;
}
// 添加失败:因为此tp的双向队列最新的RecordBatch已经没有足够的容量来存储这条记录了。所以需要重新建立一个RecordBatch
// 构建一个内存的MemoryRecords(内存记录集)
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
// 为添加失败的消息,构建一个RecordBatch
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
// 向RecordBatch中追加数据
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
dq.addLast(batch); // 将新RecordBatch添加到对应的tp的双向队列中
// 每个RecordBatch都会被发送到服务端,服务端会响应给客户端
//在Sender线程把生产请求提交到服务端后,服务端正常响应给客户端,客户端在处理响应的回调函数中,如果是成功提交到服务端,那么就从incomplete集合中删除
// IncompleteRecordBatches启到监控RecordBatch的作用:正常提交,超时没有提交到,发现leader不能使用,Sender强制退出会对应做add和remove
incomplete.add(batch);
// 如果 dp.size()>1 就证明这个 Dueue 有一个 batch 是可以发送了
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}
} finally {
appendsInProgress.decrementAndGet();
}
}
1,获取tp对应的双向队列:Deque<RecordBatch>(没有的话,会新建一个)。
2,获取tp对应的双向队列对象的锁,试着追加记录到队列中。
3,如果2不成功,使用RecordAccumulator中的BufferPool实例给此条消息申请足够的空间。
4,获取tp对应的双向队列对象的锁,再次试着追加记录到队列中。
5,如果4还是不成功,说明tp的双向队列中最新的RecordBatch已经没有足够的容量来存储这条记录了,所以必须新创建一个RecordBatch,
6,向第5步新建的RecordBatch追加记录。
7,将新创建的RecordBatch追加到p对应的双向队列中。
8,将新创建的RecordBatch保存到存储发送尚未完成的RecordBatch的类中:IncompleteRecordBatches。
9,返回RecordAppendResult实例
IncompleteRecordBatches
incomplete:Set<RecordBatch>:保存发送尚未完成RecordBatch。
这里不会分tp,所以由KafkaProducer产生的记录,最后汇总生成的RecordBatch,在没有发送到服务端,收到服务端的响应之前,都会保存在此集合中(在Sender线程把生产请求提交到服务端后,服务端正常响应给客户端,客户端在处理响应的回调函数中,如果是成功提交到服务端,那么就从incomplete集合中删除)。
试着追加记录的方法:tryAppend
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque deque) {
RecordBatch last = deque.peekLast(); //获取队列尾部元素,队列为空是返回Null
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future == null) // 没有空间来存储键值对了,关闭MemoryRecords对象,表明已经满了
last.records.close();
else
return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
}
return null;
}
RecordBatch.tryAppend
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
if (!this.records.hasRoomFor(key, value)) {
// 如果内存记录MemoryRecords实例records中没有多余的空间直接返回NULL
return null;
} else {
long checksum = this.records.append(offsetCounter++, timestamp, key, value); // 将key、value添加进内存记录MemoryRecords中
this.maxRecordSize = Math.max(this.maxRecordSize,Record.recordSize(key, value));
// 如果需要就更新最大记录大小maxRecordSize this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length);
if (callback != null)
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}
最终追加的消息是存储在MemoryRecords中。前面判断的有无空间来存放追加的消息,也是判断的MemoryRecords中是否有空间。MemoryRecords用来存储消息的是ByteBuffer实例(在BufferPool对象中分配一定空间的ByteBuffer对象)。
RecordAccumulator,topic-partition,RecordBatch,记录(R)的关系
有两个疑问:
第一:在RecordAccumulator的append方法,为什么要试着追加两次记录到RecordBatch中?
思考:在一个KafkaProducer实例中会持有一个RecordAccumulator实例。在并发环境下一个KafkaProducer实例可能会被多个线程所持有。在append中会持有dq的锁,那么在两次append之间可能某一个线程已经释放了持有dq的锁,再者已经走完了第二个锁的逻辑(创建新的RecordBatch空间)。那么线程在获取dq的锁后,如果在试着追加一次记录到RecordBatch中,是有可能会成功的。如果还是不行,再去为此dq创建新的RecordBatch空间。
第二:在第二次试着追加记录到RecordBatch中的时候,为什么要预先分配空间,在加入不成功后又释放空间,而不是确认了第二次也加不成功后,再分配空间。
思考:通过和柳年思水的讨论,觉得以下解释似乎合理:放在锁的外面,是让和发送本身不相关的操作尽量的放在锁外,这样在发送的时候就不会因为获取不到锁而阻塞。