KafkaProducer之 RecordAccumulator

2020-09-14  本文已影响0人  _孙行者_

写缓存相关

append() 方法

org.apache.kafka.clients.producer.internals.RecordAccumulator#append()

/**
     * 添加 record 到 accumulator,并返回结果
     * 返回结果是带有 future metadata。并且带有标识,添加的 batch 是否满了,或者是不是新创建的。
     */
    public RecordAppendResult append(TopicPartition tp, // topic和partition的绑定
                                     long timestamp,//指定时间戳
                                     byte[] key,    //key
                                     byte[] value,  //value
                                     Header[] headers,  //headers
                                     Callback callback, //回调方法
                                     long maxTimeToBlock,   //最大等待时长
                                     boolean abortOnNewBatch,   //在要创建新的 batch时,是否放弃
                                     long nowMs) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        //记录进行中的线程数
        appendsInProgress.incrementAndGet();
        //接收返回的buffer
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS; //初始化一个 空的 headers
        try {
            // check if we have an in-progress batch
            // 取当前topicPartition 的 deque,没有的话,创建一个
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                //尝试下写record 到 buffer 里
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                if (appendResult != null)
                    // 写成功了的,就结束了
                    return appendResult;
            }
            //当前已有的batch写失败了,不满足了,需要创建新的 batch了
            // we don't have an in-progress record batch try to allocate a new batch
            if (abortOnNewBatch) {
                // 不创建的时候,返回失败的内容
                // Return a result that will cause another call to append.
                return new RecordAppendResult(null, false, false, true);
            }

            // 取魔数,即 api 的版本号,这个是用来跟 broker 端是接口互通的
            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            // 不够一个 batch ,也取一个 batch的大小,后面再来的可以往一块塞
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            //分配内存
            buffer = free.allocate(size, maxTimeToBlock);

            // Update the current time in case the buffer allocation blocked above.
            nowMs = time.milliseconds();
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                // 校验 closed
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                // 再次尝试写 record 到 buffer
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                if (appendResult != null) {
                    // 写成功了的,返回
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    return appendResult;
                }
                //还是写失败了,再通过buffer构建 MemoryRecordsBuilder
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                // 构建 batch
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
                // 再次写record 到 缓存中
                FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                        callback, nowMs));
                // 把 batch 加到 deque 中
                dq.addLast(batch);
                // 把 batch 加到 incomplete 中
                incomplete.add(batch);

                // Don't deallocate this buffer in the finally block as it's being used in the record batch
                // 这里是个小技巧,已初始化的 buffer , 已经保存到 MemoryRecordsBuilder中了,把这里的 buffer 指针,指向 null
                // 这样就确保后面的 final 不会释放了 这个buffer 。
                buffer = null;
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
            }
        } finally {
            if (buffer != null)
                // 分配了内存,但是没用上 ,再释放了
                free.deallocate(buffer);
            // 流程结束了,数量减 1
            appendsInProgress.decrementAndGet();
        }
    }

    private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
        if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) {
            throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
                "support the required message format (v2). The broker must be version 0.11 or later.");
        }
        // 构建 MemoryRecordsBuilder
        return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
    }

    /**
     *  尝试添加 record 到 ProducerBatch 中
     *
     *  如果deque 里的 batch 都 满了 ,返回 null 。
     *  如果刚好 lastBatch 满了 , 那么把 lastBatch 标识为 关了。不再写了
     */
    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                         Callback callback, Deque<ProducerBatch> deque, long nowMs) {
        ProducerBatch last = deque.peekLast();
        if (last != null) {
            // 有batch , 往lastBatch 写数据
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
            if (future == null)
                // lastBatch 空间不足了,把 lastBatch 关了。
                last.closeForRecordAppends();
            else
                // 写record 成功了,返回带 future的结果
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
        }
        // deque 里没有 batch ,返回 null
        return null;
    }
KafkaProducer-RecordAccumulate-append.png

发送 ( sender ) 相关

ready() 方法,有Node准备好可以发送了
   /**
     * 这个方法拿的是节点(Node),并不关心当前的数据
     * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
     * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
     * partition batches.
     * 获取已经准备好可以发送的Node节点,及相关partition,对于最早的那些,还没准备好的(non-sendable)partition 也会设置为 ready。
     * 还有一些已经准备好的,但缺少leader的 partition,也会在 return 结果中。
     * <p>
     * A destination node is ready to send data if:
     * 一个节点是否准备好可以发送数据,需要具体以下的条件。
     * <ol>
     * <li>There is at least one partition that is not backing off its send
     * 至少有一个partition 没有在等待中,也就是说至少有一个 partition的数据,已经写缓存写完了。
     * <li><b>and</b> those partitions are not muted (to prevent reordering if
     *   {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
     *   is set to one)</li>
     *   并且 这些已经写完缓存的 partition 不是 静默处理的。也就是说可以发送的。
     * <li><b>and <i>any</i></b> of the following are true</li>
     * 以下条件,有一个是true就可以发送了
     * <ul>
     *     <li>The record set is full</li> // 缓存满了
     *     <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
     *     // 缓存还没满,但是已经等了 lingerMs 的时间了
     *     <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
     *     are immediately considered ready).</li>accumulator 的缓存池已经满了,再写不下数据了,写缓存的线程都在阻塞了
     *
     *     <li>The accumulator has been closed</li> accumulator 被关了
     * </ul>
     * </ol>
     */
    public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        Set<Node> readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        Set<String> unknownLeaderTopics = new HashSet<>();
        // free是否被用光了。queued > 0 ,表示有等待写free的线程,free就满了
        boolean exhausted = this.free.queued() > 0;
        //遍历数据 TopicPartition的ProducerBatch
        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
            Deque<ProducerBatch> deque = entry.getValue();
            //deque会同时存在写和读两种场景,所以必须得加锁
            synchronized (deque) {
                // When producing to a large number of partitions, this path is hot and deques are often empty.
                // We check whether a batch exists first to avoid the more expensive checks whenever possible.
                // 这里是一个优化动作, 之前是 leader 和 deque 同时判断. 但发现大部分情况都是失败在 deque 这里了, leader 判断用不到 . 优化做了个拆分,先判断 deque , 再判断leader
                // 这里的意思是,deque是取第一个作为代表了,有数据就有数据,没数据就没数据。
                // 因为是锁内操作,如果全遍历的话,操作比较重,性能不行。
                ProducerBatch batch = deque.peekFirst();
                if (batch != null) {
                    //有数据要发送,取 partition
                    TopicPartition part = entry.getKey();
                    //从集群中,取partition的leader
                    Node leader = cluster.leaderFor(part);
                    if (leader == null) {
                        // This is a partition for which leader is not known, but messages are available to send.
                        // Note that entries are currently not removed from batches when deque is empty.
                        // 异常情况,没有 leader ,远端服务异常了,但数据已经准备好可以发送了,
                        // 将情况作为返回值抛出去,让sender去决定
                        unknownLeaderTopics.add(part.topic());
                    } else if (!readyNodes.contains(leader) && !isMuted(part)) {
                        //leader 节点是准备好可以发送的,那么判断下
                        //batch从最后一次写数据,到当前所经历的时间,
                        long waitedTimeMs = batch.waitedTimeMs(nowMs);
                        //是否是等待中的 batch
                        boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                        // 是等待中的,1:等待重试的,取重试的时间。2:等待数据的,取等待数据的时间
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        //batch 是否满了,deque.size() > , 说明有多个 batch,当前这个batch 就算没满,也装不下更多的数据了。 || batch 真的满了。
                        boolean full = deque.size() > 1 || batch.isFull();
                        // 等待是否超时了
                        boolean expired = waitedTimeMs >= timeToWaitMs;
                        // batch 满了,超时了,free用光了, RecordAccumulator 被关了,有线程正在等待清空缓存,发送数据
                        boolean sendable = full || expired || exhausted || closed || flushInProgress();
                        // 可发送,不在等待中。决定了,就你了。去吧皮卡丘!
                        if (sendable && !backingOff) {
                            readyNodes.add(leader);
                        } else {
                            //不可发送,或者还在等待中
                            // 有正在等待中的,计算下还要剩余等待的时长
                            long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                            // Note that this results in a conservative estimate since an un-sendable partition may have
                            // a leader that will later be found to have sendable data. However, this is good enough
                            // since we'll just wake up and then sleep again for the remaining time.
                            // 可能当前还不能发送的 partition , 过会就有可以发送的了 . 所以这里会重复重置最短的等待时间 .
                            // 取最短的等待时间睡眠, 然后再起来取数据, 再睡眠,再起来... 一直循环.
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
    }

RecordAccumulator-ready.png

ready的条件比较多,具体看图吧。

drain() 方法,取缓存数据

    /**
     * 把给定的 节点 (Node) 的数据给 Drain (抽干,取完) 了
     * 在进行中的时候,别的方法不能操作这个节点,禁掉(Mute)
     */
    public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
        if (nodes.isEmpty())
            return Collections.emptyMap();


        Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
        for (Node node : nodes) {
            //遍历所有可发送的Node ,取出Node上所有相关的数据
            List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
            //正常取出,放到结果集中
            batches.put(node.id(), ready);
        }
        return batches;
    }

    private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
        int size = 0;
        //从 cluster 中取出 node中的所有 partition
        List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
        List<ProducerBatch> ready = new ArrayList<>();
        /* to make starvation less likely this loop doesn't start at 0 */
        // 为了防止饥饿的场景,循环不是从0开始的,为什么会有饥饿场景 ???
        int start = drainIndex = drainIndex % parts.size();
        do {
            // 取partition
            PartitionInfo part = parts.get(drainIndex);
            TopicPartition tp = new TopicPartition(part.topic(), part.partition());
            //下一次遍历的index
            this.drainIndex = (this.drainIndex + 1) % parts.size();

            // Only proceed if the partition has no in-flight batches.
            if (isMuted(tp))
                //正在读写数据中的,跳过
                continue;
            //取到deque
            Deque<ProducerBatch> deque = getDeque(tp);
            if (deque == null)
                //已经空了,跳过
                continue;
            // deque的所有操作,都要先加锁
            synchronized (deque) {
                // invariant: !isMuted(tp,now) && deque != null
                // 拿第一个,判断一下
                ProducerBatch first = deque.peekFirst();
                if (first == null)
                    continue;

                // first != null
                // 是否等待中的batch
                boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
                // Only drain the batch if it is not during backoff period.
                if (backoff)
                    // 是等待中的,跳过
                    continue;

                // 大小超出 一次请求的最大限制
                if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
                    // 那就这样吧,一次请求满了为止
                    // there is a rare case that a single batch size is larger than the request size due to
                    // compression; in this case we will still eventually send this batch in a single request
                    break;
                } else {
                    // 当前在事务中的,事务还没完的,不抽数据
                    if (shouldStopDrainBatchesForPartition(first, tp))
                        break;

                    //当前是否开启了事务
                    boolean isTransactional = transactionManager != null && transactionManager.isTransactional();
                    ProducerIdAndEpoch producerIdAndEpoch =
                        transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
                    // 真正从 deque 里取出 batch , poll出来
                    // 每个 deque 里只取第一个,
                    // 取数据是从deque的第一个取,写数据是从deque的最后一个写,所以deque需要一个双向队列
                    ProducerBatch batch = deque.pollFirst();
                    if (producerIdAndEpoch != null && !batch.hasSequence()) {
                        // 处理事务相关的,关于batch的顺序
                        // If the batch already has an assigned sequence, then we should not change the producer id and
                        // sequence number, since this may introduce duplicates. In particular, the previous attempt
                        // may actually have been accepted, and if we change the producer id and sequence here, this
                        // attempt will also be accepted, causing a duplicate.
                        //
                        // Additionally, we update the next sequence number bound for the partition, and also have
                        // the transaction manager track the batch so as to ensure that sequence ordering is maintained
                        // even if we receive out of order responses.
                        batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
                        transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
                        log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
                                "{} being sent to partition {}", producerIdAndEpoch.producerId,
                            producerIdAndEpoch.epoch, batch.baseSequence(), tp);

                        transactionManager.addInFlightBatch(batch);
                    }
                    // batch 关了,不再读写数据
                    batch.close();
                    // 累加 batch 里的数据量大小
                    size += batch.records().sizeInBytes();
                    // batch 加到待发送的一批里
                    ready.add(batch);
                    // 标识抽完数据的时间
                    batch.drained(now);
                }
            }
        } while (start != drainIndex);
        return ready;
    }
RecordAccumulator-drain.png

如果文章有帮助到您,请点个赞,您的反馈会让我感到文章是有价值的

上一篇下一篇

猜你喜欢

热点阅读