KafkaProducer之 RecordAccumulator
2020-09-14 本文已影响0人
_孙行者_
写缓存相关
append() 方法
org.apache.kafka.clients.producer.internals.RecordAccumulator#append()
/**
* 添加 record 到 accumulator,并返回结果
* 返回结果是带有 future metadata。并且带有标识,添加的 batch 是否满了,或者是不是新创建的。
*/
public RecordAppendResult append(TopicPartition tp, // topic和partition的绑定
long timestamp,//指定时间戳
byte[] key, //key
byte[] value, //value
Header[] headers, //headers
Callback callback, //回调方法
long maxTimeToBlock, //最大等待时长
boolean abortOnNewBatch, //在要创建新的 batch时,是否放弃
long nowMs) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
//记录进行中的线程数
appendsInProgress.incrementAndGet();
//接收返回的buffer
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS; //初始化一个 空的 headers
try {
// check if we have an in-progress batch
// 取当前topicPartition 的 deque,没有的话,创建一个
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
//尝试下写record 到 buffer 里
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
// 写成功了的,就结束了
return appendResult;
}
//当前已有的batch写失败了,不满足了,需要创建新的 batch了
// we don't have an in-progress record batch try to allocate a new batch
if (abortOnNewBatch) {
// 不创建的时候,返回失败的内容
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true);
}
// 取魔数,即 api 的版本号,这个是用来跟 broker 端是接口互通的
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
// 不够一个 batch ,也取一个 batch的大小,后面再来的可以往一块塞
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
//分配内存
buffer = free.allocate(size, maxTimeToBlock);
// Update the current time in case the buffer allocation blocked above.
nowMs = time.milliseconds();
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
// 校验 closed
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 再次尝试写 record 到 buffer
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null) {
// 写成功了的,返回
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
//还是写失败了,再通过buffer构建 MemoryRecordsBuilder
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
// 构建 batch
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
// 再次写record 到 缓存中
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, nowMs));
// 把 batch 加到 deque 中
dq.addLast(batch);
// 把 batch 加到 incomplete 中
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
// 这里是个小技巧,已初始化的 buffer , 已经保存到 MemoryRecordsBuilder中了,把这里的 buffer 指针,指向 null
// 这样就确保后面的 final 不会释放了 这个buffer 。
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
} finally {
if (buffer != null)
// 分配了内存,但是没用上 ,再释放了
free.deallocate(buffer);
// 流程结束了,数量减 1
appendsInProgress.decrementAndGet();
}
}
private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) {
throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
"support the required message format (v2). The broker must be version 0.11 or later.");
}
// 构建 MemoryRecordsBuilder
return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
}
/**
* 尝试添加 record 到 ProducerBatch 中
*
* 如果deque 里的 batch 都 满了 ,返回 null 。
* 如果刚好 lastBatch 满了 , 那么把 lastBatch 标识为 关了。不再写了
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque, long nowMs) {
ProducerBatch last = deque.peekLast();
if (last != null) {
// 有batch , 往lastBatch 写数据
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
if (future == null)
// lastBatch 空间不足了,把 lastBatch 关了。
last.closeForRecordAppends();
else
// 写record 成功了,返回带 future的结果
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
}
// deque 里没有 batch ,返回 null
return null;
}
KafkaProducer-RecordAccumulate-append.png
发送 ( sender ) 相关
ready() 方法,有Node准备好可以发送了
/**
* 这个方法拿的是节点(Node),并不关心当前的数据
* Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
* partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
* partition batches.
* 获取已经准备好可以发送的Node节点,及相关partition,对于最早的那些,还没准备好的(non-sendable)partition 也会设置为 ready。
* 还有一些已经准备好的,但缺少leader的 partition,也会在 return 结果中。
* <p>
* A destination node is ready to send data if:
* 一个节点是否准备好可以发送数据,需要具体以下的条件。
* <ol>
* <li>There is at least one partition that is not backing off its send
* 至少有一个partition 没有在等待中,也就是说至少有一个 partition的数据,已经写缓存写完了。
* <li><b>and</b> those partitions are not muted (to prevent reordering if
* {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
* is set to one)</li>
* 并且 这些已经写完缓存的 partition 不是 静默处理的。也就是说可以发送的。
* <li><b>and <i>any</i></b> of the following are true</li>
* 以下条件,有一个是true就可以发送了
* <ul>
* <li>The record set is full</li> // 缓存满了
* <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
* // 缓存还没满,但是已经等了 lingerMs 的时间了
* <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
* are immediately considered ready).</li>accumulator 的缓存池已经满了,再写不下数据了,写缓存的线程都在阻塞了
*
* <li>The accumulator has been closed</li> accumulator 被关了
* </ul>
* </ol>
*/
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
// free是否被用光了。queued > 0 ,表示有等待写free的线程,free就满了
boolean exhausted = this.free.queued() > 0;
//遍历数据 TopicPartition的ProducerBatch
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
Deque<ProducerBatch> deque = entry.getValue();
//deque会同时存在写和读两种场景,所以必须得加锁
synchronized (deque) {
// When producing to a large number of partitions, this path is hot and deques are often empty.
// We check whether a batch exists first to avoid the more expensive checks whenever possible.
// 这里是一个优化动作, 之前是 leader 和 deque 同时判断. 但发现大部分情况都是失败在 deque 这里了, leader 判断用不到 . 优化做了个拆分,先判断 deque , 再判断leader
// 这里的意思是,deque是取第一个作为代表了,有数据就有数据,没数据就没数据。
// 因为是锁内操作,如果全遍历的话,操作比较重,性能不行。
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
//有数据要发送,取 partition
TopicPartition part = entry.getKey();
//从集群中,取partition的leader
Node leader = cluster.leaderFor(part);
if (leader == null) {
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
// 异常情况,没有 leader ,远端服务异常了,但数据已经准备好可以发送了,
// 将情况作为返回值抛出去,让sender去决定
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !isMuted(part)) {
//leader 节点是准备好可以发送的,那么判断下
//batch从最后一次写数据,到当前所经历的时间,
long waitedTimeMs = batch.waitedTimeMs(nowMs);
//是否是等待中的 batch
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
// 是等待中的,1:等待重试的,取重试的时间。2:等待数据的,取等待数据的时间
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
//batch 是否满了,deque.size() > , 说明有多个 batch,当前这个batch 就算没满,也装不下更多的数据了。 || batch 真的满了。
boolean full = deque.size() > 1 || batch.isFull();
// 等待是否超时了
boolean expired = waitedTimeMs >= timeToWaitMs;
// batch 满了,超时了,free用光了, RecordAccumulator 被关了,有线程正在等待清空缓存,发送数据
boolean sendable = full || expired || exhausted || closed || flushInProgress();
// 可发送,不在等待中。决定了,就你了。去吧皮卡丘!
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
//不可发送,或者还在等待中
// 有正在等待中的,计算下还要剩余等待的时长
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
// 可能当前还不能发送的 partition , 过会就有可以发送的了 . 所以这里会重复重置最短的等待时间 .
// 取最短的等待时间睡眠, 然后再起来取数据, 再睡眠,再起来... 一直循环.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
RecordAccumulator-ready.png
ready的条件比较多,具体看图吧。
drain() 方法,取缓存数据
/**
* 把给定的 节点 (Node) 的数据给 Drain (抽干,取完) 了
* 在进行中的时候,别的方法不能操作这个节点,禁掉(Mute)
*/
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
//遍历所有可发送的Node ,取出Node上所有相关的数据
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
//正常取出,放到结果集中
batches.put(node.id(), ready);
}
return batches;
}
private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
int size = 0;
//从 cluster 中取出 node中的所有 partition
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<ProducerBatch> ready = new ArrayList<>();
/* to make starvation less likely this loop doesn't start at 0 */
// 为了防止饥饿的场景,循环不是从0开始的,为什么会有饥饿场景 ???
int start = drainIndex = drainIndex % parts.size();
do {
// 取partition
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
//下一次遍历的index
this.drainIndex = (this.drainIndex + 1) % parts.size();
// Only proceed if the partition has no in-flight batches.
if (isMuted(tp))
//正在读写数据中的,跳过
continue;
//取到deque
Deque<ProducerBatch> deque = getDeque(tp);
if (deque == null)
//已经空了,跳过
continue;
// deque的所有操作,都要先加锁
synchronized (deque) {
// invariant: !isMuted(tp,now) && deque != null
// 拿第一个,判断一下
ProducerBatch first = deque.peekFirst();
if (first == null)
continue;
// first != null
// 是否等待中的batch
boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
// Only drain the batch if it is not during backoff period.
if (backoff)
// 是等待中的,跳过
continue;
// 大小超出 一次请求的最大限制
if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
// 那就这样吧,一次请求满了为止
// there is a rare case that a single batch size is larger than the request size due to
// compression; in this case we will still eventually send this batch in a single request
break;
} else {
// 当前在事务中的,事务还没完的,不抽数据
if (shouldStopDrainBatchesForPartition(first, tp))
break;
//当前是否开启了事务
boolean isTransactional = transactionManager != null && transactionManager.isTransactional();
ProducerIdAndEpoch producerIdAndEpoch =
transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
// 真正从 deque 里取出 batch , poll出来
// 每个 deque 里只取第一个,
// 取数据是从deque的第一个取,写数据是从deque的最后一个写,所以deque需要一个双向队列
ProducerBatch batch = deque.pollFirst();
if (producerIdAndEpoch != null && !batch.hasSequence()) {
// 处理事务相关的,关于batch的顺序
// If the batch already has an assigned sequence, then we should not change the producer id and
// sequence number, since this may introduce duplicates. In particular, the previous attempt
// may actually have been accepted, and if we change the producer id and sequence here, this
// attempt will also be accepted, causing a duplicate.
//
// Additionally, we update the next sequence number bound for the partition, and also have
// the transaction manager track the batch so as to ensure that sequence ordering is maintained
// even if we receive out of order responses.
batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
"{} being sent to partition {}", producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch, batch.baseSequence(), tp);
transactionManager.addInFlightBatch(batch);
}
// batch 关了,不再读写数据
batch.close();
// 累加 batch 里的数据量大小
size += batch.records().sizeInBytes();
// batch 加到待发送的一批里
ready.add(batch);
// 标识抽完数据的时间
batch.drained(now);
}
}
} while (start != drainIndex);
return ready;
}
RecordAccumulator-drain.png
如果文章有帮助到您,请点个赞,您的反馈会让我感到文章是有价值的