Flink源码解析Flink

一文搞懂 Flink 处理 Barrier 全过程

2020-07-03  本文已影响0人  shengjk1

上次我们讲到了 Flink Checkpoint Barrier 全流程 还有 Flink 消费消息的全流程

分类

Flink 处理 Barrier 分两种:

  1. barrier 对齐
  2. barrier 不对齐
    对应的类


    在这里插入图片描述

    我们就以 BarrierBuffer ( barrier 对齐 ) 为例。

正文

关键就是 getNextNonBlocked 方法


    @Override
    // 从 ResultSubPartition 中获取数据并处理 barrier
    public BufferOrEvent getNextNonBlocked() throws Exception {
        while (true) {
            // process buffered BufferOrEvents before grabbing new ones
            Optional<BufferOrEvent> next;
            //barrier block 解除后 currentBuffered 不为 null,其他情况都是 null 了
            if (currentBuffered == null) {
                // 如果当前有堆积的消息,直接从 InputGate 中获取,否则从缓存中获取(通过 CachedBufferBlocker 缓存的数据)
                // 通过 inputGate 中的 inputChannel 来获取 ResultSubPartition 中的数据
                next = inputGate.getNextBufferOrEvent();
            }else {
                //barrier block 解除后 next 中的 value 不为 null
                next = Optional.ofNullable(currentBuffered.getNext());
                if (!next.isPresent()) {
                    //完成缓冲数据的消费
                    completeBufferedSequence();
                    return getNextNonBlocked();
                }
            }

            if (!next.isPresent()) {
                if (!endOfStream) {
                    // end of input stream. stream continues with the buffered data
                    endOfStream = true;
                    releaseBlocksAndResetBarriers();
                    return getNextNonBlocked();
                }
                else {
                    // final end of both input and buffered data
                    return null;
                }
            }
            
            //当 barrier 全部对齐之后,先消费 bufferBlocker 中的 ArrayDeque<BufferOrEvent> currentBuffers 的数据
            BufferOrEvent bufferOrEvent = next.get();
            if (isBlocked(bufferOrEvent.getChannelIndex())) {
                // if the channel is blocked, we just store the BufferOrEvent
                //  barrier 对齐 缓存数据
                bufferBlocker.add(bufferOrEvent);
                checkSizeLimit();
            }
            else if (bufferOrEvent.isBuffer()) {
                return bufferOrEvent;
            }
            // 处理 barrier
            else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
                if (!endOfStream) {
                    // process barriers only if there is a chance of the checkpoint completing
                    //除 trigger task 外的 operator 都是在这里做的 checkpoint 只有通过 processInput 消费到才表示 barrier 经过了上游算子
                    processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
                }
            }
            else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
                processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
            }
            else {
                if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
                    processEndOfPartition();
                }
                return bufferOrEvent;
            }
        }
    }

当没有发生 barrier 对齐完成 这个动作时,currentBuffered == null,currentBuffered 就是当前要处理的 buffer,当 buffer 是数据的时候它就正常消费数据走 Flink 消费消息的全流程,当遇到 barrier 时,开始处理 barrier

// 一个 opertor 必须收到从每个 inputchannel 发过来的同一序号的 barrier 之后才能发起本节点的 checkpoint,
    //  如果有的 channel 的数据处理的快了,那该 barrier 后的数据还需要缓存起来,
    //  如果有的 inputchannel 被关闭了,那它就不会再发送 barrier 过来了
    private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
        final long barrierId = receivedBarrier.getId();

        // fast path for single channel cases
        if (totalNumberOfInputChannels == 1) {
            if (barrierId > currentCheckpointId) {
                // new checkpoint
                currentCheckpointId = barrierId;
                // 触发 checkpoint
                notifyCheckpoint(receivedBarrier);
            }
            return;
        }

        // -- general code path for multiple input channels --
        // 大于等于第二次处理 barrier 的时候
        if (numBarriersReceived > 0) {
            // this is only true if some alignment is already progress and was not canceled

            if (barrierId == currentCheckpointId) {
                // regular case
                //阻塞 channelIndex 对应的 channel 其实就是 blockedChannels[channelIndex] = true;
                onBarrier(channelIndex);
            }else if (barrierId > currentCheckpointId) {
                // we did not complete the current checkpoint, another started before
                LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
                        "Skipping current checkpoint.",
                    inputGate.getOwningTaskName(),
                    barrierId,
                    currentCheckpointId);

                // let the task know we are not completing this
                notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));

                // abort the current checkpoint
                releaseBlocksAndResetBarriers();

                // begin a the new checkpoint
                beginNewAlignment(barrierId, channelIndex);
            }else {
                // ignore trailing barrier from an earlier checkpoint (obsolete now)
                return;
            }
        }else if (barrierId > currentCheckpointId) {
            // first barrier of a new checkpoint
            beginNewAlignment(barrierId, channelIndex);
        }else {
            // either the current checkpoint was canceled (numBarriers == 0) or
            // this barrier is from an old subsumed checkpoint
            return;
        }

        // check if we have all barriers - since canceled checkpoints always have zero barriers
        // this can only happen on a non canceled checkpoint
        if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
            // actually trigger checkpoint
            if (LOG.isDebugEnabled()) {
                LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
                    inputGate.getOwningTaskName(),
                    receivedBarrier.getId(),
                    receivedBarrier.getTimestamp());
            }

            releaseBlocksAndResetBarriers();
            // 当收到全部的 barrier 之后,就会触发 notifyCheckpoint(),
            // 该方法又会调用 StreamTask 的 triggerCheckpoint ,和之前的operator是一样的
            notifyCheckpoint(receivedBarrier);
        }
    }

numBarriersReceived 的默认值是0,所以第一个 barrier 进来后,会进入 beginNewAlignment 方法

private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
        currentCheckpointId = checkpointId;
        //numBarriersReceived++ 并设置 channelIndex 对应的 channel 为 block channel
        onBarrier(channelIndex);

        startOfAlignmentTimestamp = System.nanoTime();

        if (LOG.isDebugEnabled()) {
            LOG.debug("{}: Starting stream alignment for checkpoint {}.",
                inputGate.getOwningTaskName(),
                checkpointId);
        }
    }

当再有其他相同的 barrier 进入时,barrierId == currentCheckpointId 为 true,直到 numBarriersReceived + numClosedChannels == totalNumberOfInputChannels 时,触发 notifyCheckpoint,并报告 alignment buffer 以及 alignment time。(彩蛋: 稍后会更新 checkpoint 全流程欢迎关注 )。

如果其他的 channel 中的 barrier 延迟了,即 numBarriersReceived + numClosedChannels != totalNumberOfInputChannels,已经 receive barrier 对应的 channel 数据会进入 bufferBlocker。
bufferBlocker 是通过 ArrayDeque<BufferOrEvent> currentBuffers 来存储数据的,也就是说默认情况下 bufferBlocker.currentBuffers 会无限增大。

当 numBarriersReceived + numClosedChannels == totalNumberOfInputChannels 时,会先进行 releaseBlocksAndResetBarriers() 在进行 notifyCheckpoint。
releaseBlocksAndResetBarriers 主要的目的是要先消费已加入缓存中的数据。

/**
     * Releases the blocks on all channels and resets the barrier count.
     * Makes sure the just written data is the next to be consumed.
     */
    // 将 bufferBlocker 里面缓存的数据 bufferOrEvent 赋值给 currentBuffered
    private void releaseBlocksAndResetBarriers() throws IOException {
        LOG.debug("{}: End of stream alignment, feeding buffered data back.",
            inputGate.getOwningTaskName());

        for (int i = 0; i < blockedChannels.length; i++) {
            blockedChannels[i] = false;
        }

        if (currentBuffered == null) {
            // common case: no more buffered data
            currentBuffered = bufferBlocker.rollOverReusingResources();
            if (currentBuffered != null) {
                currentBuffered.open();
            }
        }else {
            // uncommon case: buffered data pending
            // push back the pending data, if we have any
            LOG.debug("{}: Checkpoint skipped via buffered data:" +
                    "Pushing back current alignment buffers and feeding back new alignment data first.",
                inputGate.getOwningTaskName());

            // since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
            BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources();
            if (bufferedNow != null) {
                bufferedNow.open();
                queuedBuffered.addFirst(currentBuffered);
                numQueuedBytes += currentBuffered.size();
                currentBuffered = bufferedNow;
            }
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("{}: Size of buffered data: {} bytes",
                inputGate.getOwningTaskName(),
                currentBuffered == null ? 0L : currentBuffered.size());
        }

        // the next barrier that comes must assume it is the first
        numBarriersReceived = 0;

        if (startOfAlignmentTimestamp > 0) {
            latestAlignmentDurationNanos = System.nanoTime() - startOfAlignmentTimestamp;
            startOfAlignmentTimestamp = 0;
        }
    }

当执行完 releaseBlocksAndResetBarriers 方法时,currentBuffered!=null 了,会进入

//barrier block 解除后 next 中的 value 不为 null
                next = Optional.ofNullable(currentBuffered.getNext());

然后直接消费数据

//当 barrier 全部对齐之后,先消费 bufferBlocker 中的 ArrayDeque<BufferOrEvent> currentBuffers 的数据
            BufferOrEvent bufferOrEvent = next.get();
            if (isBlocked(bufferOrEvent.getChannelIndex())) {
                // if the channel is blocked, we just store the BufferOrEvent
                //  barrier 对齐 缓存数据
                bufferBlocker.add(bufferOrEvent);
                checkSizeLimit();
            }
            else if (bufferOrEvent.isBuffer()) {
                return bufferOrEvent;
            }

一直消费缓存中的数据( 此过程会阻塞不会继续消费 inputGate 中的数据),直至消耗完成

next = Optional.ofNullable(currentBuffered.getNext());
                if (!next.isPresent()) {
                    //完成缓冲数据的消费
                    completeBufferedSequence();
                    return getNextNonBlocked();
                }

完成了之后,就跟程序第一次运行至此一样,循环往复。

总结
在这里插入图片描述
上一篇下一篇

猜你喜欢

热点阅读